STXXL  1.4.1
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
Tutorial for the Stream Package
Author
Timo Bingmann (2012-06-11)

This page gives a short introduction into the stream package. First the main abstractions are discussed and then some examples on how to utilize the existing algorithms are developed.

All example code can be found in examples/stream/stream1.cpp

In Generating Random Graphs using Streams another example is given, where an existing algorithm is "pipelined".

Abstraction, Interface and a Simple Example

The stream package is built around the abstract notion of an object being able to produce a sequence of output values. Only three simple operations are necessary:

  • Retrieval of the current value: prefix * operator
  • Advance to the next value in the sequence: prefix ++ operator
  • Indication of the sequence's end: empty() function

The most common place object that fits easily into this abstraction is the random generator. Actually, a random generator only requires two operations: it can be queried for its current value and be instructed to calculate/advance to new value. Of course the random sequence should be unbounded, so an empty() function would always be false. Nevertheless, this common-place example illustrates the purpose of the stream interface pretty well.

All stream objects must support the three operations above, they form the stream algorithm concept. In C++ a class conforms to this concept if it implements the following interface:

struct stream_object
{
// Type of the values in the output sequence.
typedef output_type value_type;
// Retrieval prefix * operator (like dereferencing a pointer or iterator).
const value_type& operator* () const;
// Prefix increment ++ operator, which advances the stream to the next value.
stream_object& operator++ ();
// Empty indicator. True if the last ++ operation could not fetch a value.
bool empty() const;
};

A very simple stream object that produces the sequence 1,2,3,4,....,1000 is shown in the following snippet:

struct counter_object
{
// This stream produces a sequence of integers.
typedef int value_type;
private:
// A class attribute to save the current value.
int m_current_value;
public:
// A constructor to set the initial value to 1.
counter_object()
: m_current_value(1)
{
}
// The retrieve operator returning the current value.
const value_type& operator* () const
{
return m_current_value;
}
// Increment operator advancing to the next integer.
counter_object& operator++ ()
{
++m_current_value;
return *this;
}
// Empty indicator, which in this case can check the current value.
bool empty() const
{
return (m_current_value > 1000);
}
};

After this verbose interface definition, the actual iteration over a stream object can be done as follows:

counter_object counter;
while (!counter.empty())
{
std::cout << *counter << " ";
++counter;
}
std::cout << std::endl;

For those who like to shorten everything into fewer lines, the above can also be expressed as a for loop:

for (counter_object cnt; !cnt.empty(); ++cnt)
{
std::cout << *cnt << " ";
}
std::cout << std::endl;

Both loops will print the following output:

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 [...] 995 996 997 998 999 1000

Pipelining: Plugging Stream Objects Together

The stream interface is so very useful for external memory algorithms because it represents the concept of sequential access to a stream of individual values. While the simple example above only works with integers, the value_type of streams will more often contain complex tuple structs with multiple components.

A stream algorithm can then be constructed from multiple stream objects that pass data from one to another. This notion of "plugging together" stream objects is used in the following example to calculate the square of each value of an integer sequence:

template <typename InputStream>
struct squaring_object
{
// This stream produces a sequence of integers.
typedef int value_type;
private:
// A reference to another stream of integers, which are our input.
InputStream& m_input_stream;
// A temporary value buffer to hold the current square for retrieval.
value_type m_current_value;
public:
// A constructor taking another stream of integers as input.
squaring_object(InputStream& input_stream)
: m_input_stream(input_stream)
{
if (!m_input_stream.empty())
{
m_current_value = *m_input_stream;
m_current_value = m_current_value * m_current_value;
}
}
// The retrieve operator returning the square of the input stream.
const value_type& operator* () const
{
return m_current_value;
}
// Increment operator: handled by incrementing the input stream.
squaring_object& operator++ ()
{
++m_input_stream;
if (!m_input_stream.empty())
{
m_current_value = *m_input_stream;
m_current_value = m_current_value * m_current_value;
}
return *this;
}
// Empty indicator: this stream is empty when the input stream is.
bool empty() const
{
return m_input_stream.empty();
}
};

For a beginner in stream object programming, the squaring example contains multiple unexpected, verbose complications.

  • We wish to allow many different integer sequences as input streams to the squaring class. For this we use template meta-programming and define squaring to take any class as InputStream template parameter. As yet, in C++ we cannot syntactically define which concepts the template parameters must fulfill, in this case one would require InputStream to implement the stream interface.
  • After defining the input stream class, one will usually need an instantiated object of that class inside the new stream class. Most common practice is to define references to other streams as class attributes, and have the actual objects be passed to the constructor of the new stream object.
    In the case of the squaring class, any InputStream object is accepted by the constructor and a reference is saved into m_input_stream.
  • As second attribute, the squaring class contains m_current_value. The additional temporary value is required in this case because operator*() must return a const-reference, so the square must actually be stored in a variable after it is calculated. Now note that the squaring operation in this version is implemented at two places: in the constructor and the operator++().
    This is necessary, because the stream concept requires that the first value be immediately available after construction! Therefore it must be calculated in the constructor, and this code is usually a duplicate to the action done in operator++(). A real implementation would probably combine the calculation code into a process() function and also do additional allocation work in the constructor.

An instance of the counter_object can be plugged into a squaring_object as done in the following example:

counter_object counter;
squaring_object<counter_object> squares(counter);
while (!squares.empty())
{
std::cout << *squares << " ";
++squares;
}
std::cout << std::endl;

The example outputs:

1 4 9 16 25 36 49 64 81 100 121 144 169 [...] 986049 988036 990025 992016 994009 996004 998001 1000000

Miscellaneous Utilities Provided by the Stream Package

The above examples are pure C++ interface manipulations and do not even require STXXL. However, when writing stream algorithms you can take advantage of the utilities provided by the stream package to create complex algorithms. Probably the most useful is the pair of sorting classes, which will be discussed after a few preliminaries.

More complex algorithms will most often use tuples as values passed from one stream to another. These tuples wrap all information fields of a specific piece of data. Simple tuples can be created using std::pair, tuples with larger number of components can use Boost.Tuple or just plain structs with multiple fields. (In the tuple case, the temporary value inside the stream struct can mostly be avoided.)

The stream package contains utilities to plug stream classes together to form complex algorithms. The following few examples are very basic algorithms:

Very often the input to a sequence of stream classes comes from an array or other container. In this case one requires an input stream object, which iterates through the container and outputs each element once. STXXL provides iterator2stream for this common purpose:

std::vector<int> intvector;
// (fill intvector)
// define stream class iterating over an integer vector
// instantiate the stream object, iterate from begin to end of intvector.
intstream_type intstream (intvector.begin(), intvector.end());
// plug in squaring object after vector iterator stream.
squaring_object<intstream_type> squares(intstream);

Most important: if the input container is a stxxl::vector, then one should use vector_iterator2stream, because this class will prefetch additional blocks from the vector while processing the stream.

// (fill intvector)
// define stream class iterating over an integer STXXL vector
// instantiate the stream object, iterate from begin to end of intvector using prefetching
intstream_type intstream (intvector.begin(), intvector.end());
// plug in squaring object after vector iterator stream.
squaring_object<intstream_type> squares(intstream);

The opposite to iterator2stream is to collect the output of a sequence of stream objects into a container or stxxl::vector. This operation is called materialize and also comes in the general version and a special version for the STXXL-vector, which uses asynchronous writes.

This example shows how to materialize a stream into a usual STL vector.

// construct the squared counter stream
counter_object counter;
squaring_object<counter_object> squares(counter);
// allocate vector of 100 integers
std::vector<int> intvector (100);
// materialize 100 integers from stream and put into vector
stxxl::stream::materialize(squares, intvector.begin(), intvector.end());

And the only modification needed to support larger data sets is to materialize to an STXXL vector:

// construct the squared counter stream
counter_object counter;
squaring_object<counter_object> squares(counter);
// allocate STXXL vector of 100 integers
stxxl::vector<int> intvector (100);
// materialize 100 integers from stream and put into STXXL vector
stxxl::stream::materialize(squares, intvector.begin(), intvector.end());

Sorting As Provided by the Stream Package

Maybe the most important set of tools in the stream package is the pairs of sorter classes runs_creator and runs_merger. The general way to sort a sequential input stream is to first consolidate a large number of input items in an internal memory buffer. Then when the buffer is full, it can be sorted in internal memory and subsequently written out to disk. This sorted sequence is then called a run. When the input stream is finished and the sorted output must be produced, theses sorted sequences can efficiently be merged using a tournament tree or similar multi-way comparison structure. (see Parallel Disk Sorting.)

STXXL implements this using two stream classes: runs_creator and runs_merger.

The following examples shows how to sort the integer sequence 1,2,...,1000 first by the right-most decimal digit, then by its absolute value (yes a somewhat constructed example, but it serves its purpose well.) For all sorters a comparator object is required which tells the sorter which of two objects is the smaller one. This is similar to the requirements of the usual STL, however, the STXXL sorters need to additional functions: min_value() and max_value() which are used as padding sentinels. These functions return the smallest and highest possible values of the given data type.

// define comparator class: compare right-most decimal and then absolute value
struct CompareMod10
{
// comparison operator() returning true if (a < b)
inline bool operator() (int a, int b) const
{
if ((a % 10) == (b % 10))
return a < b;
else
return (a % 10) < (b % 10);
}
// smallest possible integer value
int min_value() const { return INT_MIN; }
// largest possible integer value
int max_value() const { return INT_MAX; }
};

All sorters steps require an internal memory buffer. This size can be fixed using a parameter to runs_creator and runs_merger. The following example code instantiates a counter object, plugs this into a runs_creator which is followed by a runs_merger.

static const int ram_use = 10*1024*1024; // amount of memory to use in runs creation
counter_object counter; // the counter stream from first examples
// define a runs sorter for the counter stream which order by CompareMod10 object.
// instance of CompareMod10 comparator class
CompareMod10 comparemod10;
// instance of runs_creator which reads the counter stream.
rc_counter_type rc_counter (counter, comparemod10, ram_use);
// define a runs merger for the sorted runs from rc_counter.
// instance of runs_merger which merges sorted runs from rc_counter.
rm_counter_type rm_counter (rc_counter.result(), comparemod10, ram_use);
// read sorted stream: runs_merger also conforms to the stream interface.
while (!rm_counter.empty())
{
std::cout << *rm_counter << " ";
++rm_counter;
}
std::cout << std::endl;

The output of the code above is:

10 20 30 40 50 60 70 80 [...] 990 1000 1 11 21 31 41 51 61 [...] 909 919 929 939 949 959 969 979 989 999

Note that in the above example the input of the runs_creator is itself a stream. If however the data is not naturally available as a stream, one can use a variant of runs_creator which accepts input via a push() function. This is more useful when using an imperative programming style. Note that the runs_merger does not change.

static const int ram_use = 10*1024*1024; // amount of memory to use in runs creation
// define a runs sorter which accepts imperative push()s and orders by CompareMod10 object.
typedef stxxl::stream::runs_creator<stxxl::stream::use_push<int>, CompareMod10> rc_counter_type;
// instance of CompareMod10 comparator class.
CompareMod10 comparemod10;
// instance of runs_creator which waits for input.
rc_counter_type rc_counter (comparemod10, ram_use);
// write sequence of integers into runs
for (int i = 1; i <= 1000; ++i)
rc_counter.push(i);
// define a runs merger for the sorted runs from rc_counter.
// instance of runs_merger which merges sorted runs from rc_counter.
rm_counter_type rm_counter (rc_counter.result(), comparemod10, ram_use);
// read sorted stream: runs_merger also conforms to the stream interface.
while (!rm_counter.empty())
{
std::cout << *rm_counter << " ";
++rm_counter;
}
std::cout << std::endl;

And as the last example in this tutorial we show how to use stxxl::sorter, which combines runs_creator and runs_merger into one object. The sorter has two states: input and output. During input, new elements can be sorted using push(). Then to switch to output state, the function sort() is called, after which the sorter can be queried using the usual stream interface.

static const int ram_use = 10*1024*1024; // amount of memory to use in runs creation
// define a runs sorter which accepts imperative push()s and orders by CompareMod10 object.
typedef stxxl::sorter<int, CompareMod10> sr_counter_type;
// instance of CompareMod10 comparator class.
CompareMod10 comparemod10;
// instance of sorter which waits for input.
sr_counter_type sr_counter (comparemod10, ram_use);
// write sequence of integers into sorter, which creates sorted runs
for (int i = 1; i <= 1000; ++i)
sr_counter.push(i);
// signal sorter that the input stream is finished and switch to output mode.
sr_counter.sort();
// read sorted stream: sorter also conforms to the stream interface.
while (!sr_counter.empty())
{
std::cout << *sr_counter << " ";
++sr_counter;
}
std::cout << std::endl;

All three examples have the same output.