• Main Page
  • Related Pages
  • Modules
  • Namespaces
  • Classes
  • Files
  • Examples
  • File List

buf_writer.h

00001 /***************************************************************************
00002  *  include/stxxl/bits/mng/buf_writer.h
00003  *
00004  *  Part of the STXXL. See http://stxxl.sourceforge.net
00005  *
00006  *  Copyright (C) 2002-2004 Roman Dementiev <[email protected]>
00007  *
00008  *  Distributed under the Boost Software License, Version 1.0.
00009  *  (See accompanying file LICENSE_1_0.txt or copy at
00010  *  http://www.boost.org/LICENSE_1_0.txt)
00011  **************************************************************************/
00012 
00013 #ifndef STXXL_BUFFERED_WRITER_HEADER
00014 #define STXXL_BUFFERED_WRITER_HEADER
00015 
00016 #include <vector>
00017 #include <queue>
00018 
00019 #include <stxxl/bits/io/disk_queues.h>
00020 
00021 
00022 __STXXL_BEGIN_NAMESPACE
00023 
00030 
00031 
00035 template <typename block_type>
00036 class buffered_writer
00037 {
00038     buffered_writer() { }
00039 
00040 protected:
00041     typedef typename block_type::bid_type bid_type;
00042 
00043     const unsigned_type nwriteblocks;
00044     block_type * write_buffers;
00045     bid_type * write_bids;
00046     request_ptr * write_reqs;
00047     const unsigned_type writebatchsize;
00048 
00049     std::vector<int_type> free_write_blocks;            // contains free write blocks
00050     std::vector<int_type> busy_write_blocks;            // blocks that are in writing, notice that if block is not in free_
00051     // an not in busy then block is not yet filled
00052 
00053     struct batch_entry
00054     {
00055         stxxl::int64 offset;
00056         int_type ibuffer;
00057         batch_entry(stxxl::int64 o, int b) : offset(o), ibuffer(b) { }
00058     };
00059     struct batch_entry_cmp
00060     {
00061         bool operator () (const batch_entry & a, const batch_entry & b) const
00062         {
00063             return (a.offset > b.offset);
00064         }
00065     };
00066 
00067     typedef std::priority_queue<batch_entry, std::vector<batch_entry>, batch_entry_cmp> batch_type;
00068     batch_type batch_write_blocks;      // sorted sequence of blocks to write
00069 
00070 public:
00075     buffered_writer(unsigned_type write_buf_size, unsigned_type write_batch_size) :
00076         nwriteblocks((write_buf_size > 2) ? write_buf_size : 2),
00077         writebatchsize(write_batch_size ? write_batch_size : 1)
00078     {
00079         write_buffers = new block_type[nwriteblocks];
00080         write_reqs = new request_ptr[nwriteblocks];
00081         write_bids = new bid_type[nwriteblocks];
00082 
00083         for (unsigned_type i = 0; i < nwriteblocks; i++)
00084             free_write_blocks.push_back(i);
00085 
00086         disk_queues::get_instance()->set_priority_op(disk_queue::WRITE);
00087     }
00090     block_type * get_free_block()
00091     {
00092         int_type ibuffer;
00093         for (std::vector<int_type>::iterator it = busy_write_blocks.begin();
00094              it != busy_write_blocks.end(); ++it)
00095         {
00096             if (write_reqs[ibuffer = (*it)]->poll())
00097             {
00098                 busy_write_blocks.erase(it);
00099                 free_write_blocks.push_back(ibuffer);
00100 
00101                 break;
00102             }
00103         }
00104         if (UNLIKELY(free_write_blocks.empty()))
00105         {
00106             int_type size = busy_write_blocks.size();
00107             request_ptr * reqs = new request_ptr[size];
00108             int_type i = 0;
00109             for ( ; i < size; ++i)
00110             {
00111                 reqs[i] = write_reqs[busy_write_blocks[i]];
00112             }
00113             int_type completed = wait_any(reqs, size);
00114             int_type completed_global = busy_write_blocks[completed];
00115             delete[] reqs;
00116             busy_write_blocks.erase(busy_write_blocks.begin() + completed);
00117 
00118             return (write_buffers + completed_global);
00119         }
00120         ibuffer = free_write_blocks.back();
00121         free_write_blocks.pop_back();
00122 
00123         return (write_buffers + ibuffer);
00124     }
00130     block_type * write(block_type * filled_block, const bid_type & bid)        // writes filled_block and returns a new block
00131     {
00132         if (batch_write_blocks.size() >= writebatchsize)
00133         {
00134             // flush batch
00135             while (!batch_write_blocks.empty())
00136             {
00137                 int_type ibuffer = batch_write_blocks.top().ibuffer;
00138                 batch_write_blocks.pop();
00139 
00140                 if (write_reqs[ibuffer].valid())
00141                     write_reqs[ibuffer]->wait();
00142 
00143                 write_reqs[ibuffer] = write_buffers[ibuffer].write(write_bids[ibuffer]);
00144 
00145                 busy_write_blocks.push_back(ibuffer);
00146             }
00147         }
00148         //    STXXL_MSG("Adding write request to batch");
00149 
00150         int_type ibuffer = filled_block - write_buffers;
00151         write_bids[ibuffer] = bid;
00152         batch_write_blocks.push(batch_entry(bid.offset, ibuffer));
00153 
00154         return get_free_block();
00155     }
00157     void flush()
00158     {
00159         int_type ibuffer;
00160         while (!batch_write_blocks.empty())
00161         {
00162             ibuffer = batch_write_blocks.top().ibuffer;
00163             batch_write_blocks.pop();
00164 
00165             if (write_reqs[ibuffer].valid())
00166                 write_reqs[ibuffer]->wait();
00167 
00168             write_reqs[ibuffer] = write_buffers[ibuffer].write(write_bids[ibuffer]);
00169 
00170             busy_write_blocks.push_back(ibuffer);
00171         }
00172         for (std::vector<int_type>::const_iterator it =
00173                  busy_write_blocks.begin();
00174              it != busy_write_blocks.end(); it++)
00175         {
00176             ibuffer = *it;
00177             write_reqs[ibuffer]->wait();
00178         }
00179 
00180         assert(batch_write_blocks.empty());
00181         free_write_blocks.clear();
00182         busy_write_blocks.clear();
00183 
00184         for (unsigned_type i = 0; i < nwriteblocks; i++)
00185             free_write_blocks.push_back(i);
00186     }
00187 
00189     virtual ~buffered_writer()
00190     {
00191         int_type ibuffer;
00192         while (!batch_write_blocks.empty())
00193         {
00194             ibuffer = batch_write_blocks.top().ibuffer;
00195             batch_write_blocks.pop();
00196 
00197             if (write_reqs[ibuffer].valid())
00198                 write_reqs[ibuffer]->wait();
00199 
00200             write_reqs[ibuffer] = write_buffers[ibuffer].write(write_bids[ibuffer]);
00201 
00202             busy_write_blocks.push_back(ibuffer);
00203         }
00204         for (std::vector<int_type>::const_iterator it =
00205                  busy_write_blocks.begin();
00206              it != busy_write_blocks.end(); it++)
00207         {
00208             ibuffer = *it;
00209             write_reqs[ibuffer]->wait();
00210         }
00211 
00212         delete[] write_reqs;
00213         delete[] write_buffers;
00214         delete[] write_bids;
00215     }
00216 };
00217 
00219 
00220 __STXXL_END_NAMESPACE
00221 
00222 #endif // !STXXL_BUFFERED_WRITER_HEADER

Generated by  doxygen 1.7.1