• Main Page
  • Related Pages
  • Modules
  • Namespaces
  • Classes
  • Files
  • Examples
  • File List

buf_writer.h

00001 /***************************************************************************
00002  *  include/stxxl/bits/mng/buf_writer.h
00003  *
00004  *  Part of the STXXL. See http://stxxl.sourceforge.net
00005  *
00006  *  Copyright (C) 2002-2004 Roman Dementiev <[email protected]>
00007  *
00008  *  Distributed under the Boost Software License, Version 1.0.
00009  *  (See accompanying file LICENSE_1_0.txt or copy at
00010  *  http://www.boost.org/LICENSE_1_0.txt)
00011  **************************************************************************/
00012 
00013 #ifndef STXXL_BUFFERED_WRITER_HEADER
00014 #define STXXL_BUFFERED_WRITER_HEADER
00015 
00016 #include <vector>
00017 
00018 #include <stxxl/bits/mng/mng.h>
00019 
00020 
00021 __STXXL_BEGIN_NAMESPACE
00022 
00029 
00030 
00034 template <typename block_type>
00035 class buffered_writer
00036 {
00037     buffered_writer() { }
00038 
00039 protected:
00040     typedef typename block_type::bid_type bid_type;
00041 
00042     const unsigned_type nwriteblocks;
00043     block_type * write_buffers;
00044     bid_type * write_bids;
00045     request_ptr * write_reqs;
00046     const unsigned_type writebatchsize;
00047 
00048     std::vector<int_type> free_write_blocks;            // contains free write blocks
00049     std::vector<int_type> busy_write_blocks;            // blocks that are in writing, notice that if block is not in free_
00050     // an not in busy then block is not yet filled
00051 
00052     struct batch_entry
00053     {
00054         stxxl::int64 offset;
00055         int_type ibuffer;
00056         batch_entry(stxxl::int64 o, int b) : offset(o), ibuffer(b) { }
00057     };
00058     struct batch_entry_cmp
00059     {
00060         bool operator () (const batch_entry & a, const batch_entry & b) const
00061         {
00062             return (a.offset > b.offset);
00063         }
00064     };
00065 
00066     typedef std::priority_queue<batch_entry, std::vector<batch_entry>, batch_entry_cmp> batch_type;
00067     batch_type batch_write_blocks;      // sorted sequence of blocks to write
00068 
00069 public:
00074     buffered_writer(unsigned_type write_buf_size, unsigned_type write_batch_size) :
00075         nwriteblocks((write_buf_size > 2) ? write_buf_size : 2),
00076         writebatchsize(write_batch_size ? write_batch_size : 1)
00077     {
00078         write_buffers = new block_type[nwriteblocks];
00079         write_reqs = new request_ptr[nwriteblocks];
00080         write_bids = new bid_type[nwriteblocks];
00081 
00082         for (unsigned_type i = 0; i < nwriteblocks; i++)
00083             free_write_blocks.push_back(i);
00084 
00085 
00086         disk_queues::get_instance()->set_priority_op(disk_queue::WRITE);
00087     }
00090     block_type * get_free_block()
00091     {
00092         int_type ibuffer;
00093         for (std::vector<int_type>::iterator it = busy_write_blocks.begin();
00094              it != busy_write_blocks.end(); ++it)
00095         {
00096             if (write_reqs[ibuffer = (*it)]->poll())
00097             {
00098                 busy_write_blocks.erase(it);
00099                 free_write_blocks.push_back(ibuffer);
00100 
00101                 break;
00102             }
00103         }
00104         if (UNLIKELY(free_write_blocks.empty()))
00105         {
00106             int_type size = busy_write_blocks.size();
00107             request_ptr * reqs = new request_ptr[size];
00108             int_type i = 0;
00109             for ( ; i < size; ++i)
00110             {
00111                 reqs[i] = write_reqs[busy_write_blocks[i]];
00112             }
00113             int_type completed = wait_any(reqs, size);
00114             int_type completed_global = busy_write_blocks[completed];
00115             delete[] reqs;
00116             busy_write_blocks.erase(busy_write_blocks.begin() + completed);
00117 
00118             return (write_buffers + completed_global);
00119         }
00120         ibuffer = free_write_blocks.back();
00121         free_write_blocks.pop_back();
00122 
00123         return (write_buffers + ibuffer);
00124     }
00130     block_type * write(block_type * filled_block, const bid_type & bid)        // writes filled_block and returns a new block
00131     {
00132         if (batch_write_blocks.size() >= writebatchsize)
00133         {
00134             // flush batch
00135             while (!batch_write_blocks.empty())
00136             {
00137                 int_type ibuffer = batch_write_blocks.top().ibuffer;
00138                 batch_write_blocks.pop();
00139 
00140                 if (write_reqs[ibuffer].valid())
00141                     write_reqs[ibuffer]->wait();
00142 
00143                 write_reqs[ibuffer] = write_buffers[ibuffer].write(write_bids[ibuffer]);
00144 
00145                 busy_write_blocks.push_back(ibuffer);
00146             }
00147         }
00148         //    STXXL_MSG("Adding write request to batch");
00149 
00150         int_type ibuffer = filled_block - write_buffers;
00151         write_bids[ibuffer] = bid;
00152         batch_write_blocks.push(batch_entry(bid.offset, ibuffer));
00153 
00154         return get_free_block();
00155     }
00157     void flush()
00158     {
00159         int_type ibuffer;
00160         while (!batch_write_blocks.empty())
00161         {
00162             ibuffer = batch_write_blocks.top().ibuffer;
00163             batch_write_blocks.pop();
00164 
00165             if (write_reqs[ibuffer].valid())
00166                 write_reqs[ibuffer]->wait();
00167 
00168             write_reqs[ibuffer] = write_buffers[ibuffer].write(write_bids[ibuffer]);
00169 
00170             busy_write_blocks.push_back(ibuffer);
00171         }
00172         for (std::vector<int_type>::const_iterator it =
00173                  busy_write_blocks.begin();
00174              it != busy_write_blocks.end(); it++)
00175         {
00176             ibuffer = *it;
00177             write_reqs[ibuffer]->wait();
00178         }
00179 
00180         assert(batch_write_blocks.empty());
00181         free_write_blocks.clear();
00182         busy_write_blocks.clear();
00183 
00184         for (unsigned_type i = 0; i < nwriteblocks; i++)
00185             free_write_blocks.push_back(i);
00186     }
00187 
00189     virtual ~buffered_writer()
00190     {
00191         int_type ibuffer;
00192         while (!batch_write_blocks.empty())
00193         {
00194             ibuffer = batch_write_blocks.top().ibuffer;
00195             batch_write_blocks.pop();
00196 
00197             if (write_reqs[ibuffer].valid())
00198                 write_reqs[ibuffer]->wait();
00199 
00200             write_reqs[ibuffer] = write_buffers[ibuffer].write(write_bids[ibuffer]);
00201 
00202             busy_write_blocks.push_back(ibuffer);
00203         }
00204         for (std::vector<int_type>::const_iterator it =
00205                  busy_write_blocks.begin();
00206              it != busy_write_blocks.end(); it++)
00207         {
00208             ibuffer = *it;
00209             write_reqs[ibuffer]->wait();
00210         }
00211 
00212         delete[] write_reqs;
00213         delete[] write_buffers;
00214         delete[] write_bids;
00215     }
00216 };
00217 
00219 
00220 __STXXL_END_NAMESPACE
00221 
00222 #endif // !STXXL_BUFFERED_WRITER_HEADER

Generated by  doxygen 1.7.1