19 #if STXXL_HAVE_WBTL_FILE 
   27 #ifndef STXXL_VERBOSE_WBTL 
   28 #define STXXL_VERBOSE_WBTL STXXL_VERBOSE2 
   35     size_type write_buffer_size,
 
   37     int queue_id, 
int allocator_id)
 
   38     : disk_queued_file(queue_id, allocator_id), storage(backend_file),
 
   39       sz(0), write_block_size(write_buffer_size),
 
   40       free_bytes(0), curbuf(1), curpos(write_block_size)
 
   43     assert(write_buffers == 2); 
 
   44     write_buffer[0] = 
static_cast<char*
>(stxxl::aligned_alloc<STXXL_BLOCK_ALIGN>(write_block_size));
 
   45     write_buffer[1] = 
static_cast<char*
>(stxxl::aligned_alloc<STXXL_BLOCK_ALIGN>(write_block_size));
 
   46     buffer_address[0] = offset_type(-1);
 
   47     buffer_address[1] = offset_type(-1);
 
   50 wbtl_file::~wbtl_file()
 
   52     stxxl::aligned_dealloc<STXXL_BLOCK_ALIGN>(write_buffer[1]);
 
   53     stxxl::aligned_dealloc<STXXL_BLOCK_ALIGN>(write_buffer[0]);
 
   58 void wbtl_file::serve(
void* buffer, offset_type offset, size_type 
bytes,
 
   59                       request::request_type type)
 
   61     if (type == request::READ)
 
   64         sread(buffer, offset, bytes);
 
   69         swrite(buffer, offset, bytes);
 
   73 void wbtl_file::lock()
 
   78 wbtl_file::offset_type wbtl_file::size()
 
   83 void wbtl_file::set_size(offset_type newsize)
 
   85     scoped_mutex_lock mapping_lock(mapping_mutex);
 
   86     assert(sz <= newsize); 
 
   88         _add_free_region(sz, newsize - sz);
 
   89         storage->set_size(newsize);
 
   91         assert(sz == storage->size());
 
   95 #define FMT_A_S(_addr_, _size_) "0x" << std::hex << std::setfill('0') << std::setw(8) << (_addr_) << "/0x" << std::setw(8) << (_size_) 
   96         #define FMT_A_C(_addr_, _size_) "0x" << std::setw(8) << (_addr_) << "(" << std::dec << (_size_) << ")" 
   97         #define FMT_A(_addr_) "0x" << std::setw(8) << (_addr_) 
  102     scoped_mutex_lock mapping_lock(mapping_mutex);
 
  103     sortseq::iterator physical = address_mapping.find(offset);
 
  104     STXXL_VERBOSE_WBTL(
"wbtl:discard l" << FMT_A_S(offset, size) << 
" @    p" << FMT_A(physical != address_mapping.end() ? physical->second : 0xffffffff));
 
  105     if (physical == address_mapping.end()) {
 
  110         offset_type physical_offset = physical->second;
 
  111         address_mapping.erase(physical);
 
  112         _add_free_region(physical_offset, size);
 
  113         place_map::iterator reverse = reverse_mapping.find(physical_offset);
 
  114         if (reverse == reverse_mapping.end()) {
 
  115             STXXL_ERRMSG(
"discard: reverse mapping not found: " << FMT_A_S(offset, size) << 
" ==> " << 
"???");
 
  118             assert(offset == (reverse->second).first);
 
  119             reverse_mapping.erase(reverse);
 
  121         storage->discard(physical_offset, size);
 
  126 void wbtl_file::_add_free_region(offset_type offset, offset_type size)
 
  129     STXXL_VERBOSE_WBTL(
"wbtl:addfre  p" << FMT_A_S(offset, size) << 
" F <= f" << FMT_A_C(free_bytes, free_space.size()));
 
  130     offset_type region_pos = offset;
 
  131     offset_type region_size = size;
 
  132     if (!free_space.empty())
 
  134         sortseq::iterator succ = free_space.upper_bound(region_pos);
 
  135         sortseq::iterator pred = succ;
 
  137         check_corruption(region_pos, region_size, pred, succ);
 
  138         if (succ == free_space.end())
 
  140             if (pred == free_space.end())
 
  143                 assert(pred != free_space.end());
 
  145             if ((*pred).first + (*pred).second == region_pos)
 
  148                 region_size += (*pred).second;
 
  149                 region_pos = (*pred).first;
 
  150                 free_space.erase(pred);
 
  155             if (free_space.size() > 1)
 
  157                 bool succ_is_not_the_first = (succ != free_space.begin());
 
  158                 if ((*succ).first == region_pos + region_size)
 
  161                     region_size += (*succ).second;
 
  162                     free_space.erase(succ);
 
  164                 if (succ_is_not_the_first)
 
  166                     if (pred == free_space.end())
 
  169                         assert(pred != free_space.end());
 
  171                     if ((*pred).first + (*pred).second == region_pos)
 
  174                         region_size += (*pred).second;
 
  175                         region_pos = (*pred).first;
 
  176                         free_space.erase(pred);
 
  182                 if ((*succ).first == region_pos + region_size)
 
  185                     region_size += (*succ).second;
 
  186                     free_space.erase(succ);
 
  192     free_space[region_pos] = region_size;
 
  194     STXXL_VERBOSE_WBTL(
"wbtl:free    p" << FMT_A_S(region_pos, region_size) << 
" F => f" << FMT_A_C(free_bytes, free_space.size()));
 
  197 void wbtl_file::sread(
void* buffer, offset_type offset, size_type bytes)
 
  199     scoped_mutex_lock buffer_lock(buffer_mutex);
 
  201     offset_type physical_offset;
 
  204         scoped_mutex_lock mapping_lock(mapping_mutex);
 
  205         sortseq::iterator physical = address_mapping.find(offset);
 
  206         if (physical == address_mapping.end()) {
 
  207             STXXL_ERRMSG(
"wbtl_read: mapping not found: " << FMT_A_S(offset, bytes) << 
" ==> " << 
"???");
 
  209             physical_offset = 0xffffffff;
 
  212             physical_offset = physical->second;
 
  216     if (buffer_address[curbuf] <= physical_offset &&
 
  217         physical_offset < buffer_address[curbuf] + write_block_size)
 
  220         assert(physical_offset + bytes <= buffer_address[curbuf] + write_block_size);
 
  221         memcpy(buffer, write_buffer[curbuf] + (physical_offset - buffer_address[curbuf]), bytes);
 
  222         stats::get_instance()->read_cached(bytes);
 
  225     else if (buffer_address[1 - curbuf] <= physical_offset &&
 
  226              physical_offset < buffer_address[1 - curbuf] + write_block_size)
 
  229         assert(physical_offset + bytes <= buffer_address[1 - curbuf] + write_block_size);
 
  230         memcpy(buffer, write_buffer[1 - curbuf] + (physical_offset - buffer_address[1 - curbuf]), bytes);
 
  231         stats::get_instance()->read_cached(bytes);
 
  234     else if (physical_offset == 0xffffffff) {
 
  236         char* uninitialized = (
char*)
malloc(
sizeof(
char));
 
  237         memset(buffer, *uninitialized, bytes);
 
  243         request_ptr req = storage->aread(buffer, physical_offset, bytes);
 
  246     STXXL_VERBOSE_WBTL(
"wbtl:sread   l" << FMT_A_S(offset, bytes) << 
" @    p" << FMT_A(physical_offset) << 
" " << std::dec << cached);
 
  250 void wbtl_file::swrite(
void* buffer, offset_type offset, size_type bytes)
 
  252     scoped_mutex_lock buffer_lock(buffer_mutex);
 
  255         scoped_mutex_lock mapping_lock(mapping_mutex);
 
  256         sortseq::iterator physical = address_mapping.find(offset);
 
  257         STXXL_VERBOSE_WBTL(
"wbtl:swrite  l" << FMT_A_S(offset, bytes) << 
" @ <= p" <<
 
  258                            FMT_A_C(physical != address_mapping.end() ? physical->second : 0xffffffff, address_mapping.size()));
 
  259         if (physical != address_mapping.end()) {
 
  260             mapping_lock.unlock();
 
  266     if (bytes > write_block_size - curpos)
 
  270         if (buffer_address[curbuf] != offset_type(-1)) {
 
  271             STXXL_VERBOSE_WBTL(
"wbtl:w2disk  p" << FMT_A_S(buffer_address[curbuf], write_block_size));
 
  274             if (curpos < write_block_size)
 
  275                 _add_free_region(buffer_address[curbuf] + curpos, write_block_size - curpos);
 
  277             if (backend_request.get()) {
 
  278                 backend_request->wait(
false);
 
  281             backend_request = storage->awrite(write_buffer[curbuf], buffer_address[curbuf], write_block_size);
 
  286         buffer_address[curbuf] = get_next_write_block();
 
  289     assert(bytes <= write_block_size - curpos);
 
  292     memcpy(write_buffer[curbuf] + curpos, buffer, bytes);
 
  293     stats::get_instance()->write_cached(bytes);
 
  295     scoped_mutex_lock mapping_lock(mapping_mutex);
 
  296     address_mapping[offset] = buffer_address[curbuf] + curpos;
 
  297     reverse_mapping[buffer_address[curbuf] + curpos] = place(offset, bytes);
 
  298     STXXL_VERBOSE_WBTL(
"wbtl:swrite  l" << FMT_A_S(offset, bytes) << 
" @ => p" << FMT_A_C(buffer_address[curbuf] + curpos, address_mapping.size()));
 
  302 wbtl_file::offset_type wbtl_file::get_next_write_block()
 
  305     sortseq::iterator space =
 
  306         std::find_if(free_space.begin(), free_space.end(),
 
  309     if (space != free_space.end())
 
  311         offset_type region_pos = (*space).first;
 
  312         offset_type region_size = (*space).second;
 
  313         free_space.erase(space);
 
  314         if (region_size > write_block_size)
 
  315             free_space[region_pos + write_block_size] = region_size - write_block_size;
 
  317         free_bytes -= write_block_size;
 
  319         STXXL_VERBOSE_WBTL(
"wbtl:nextwb  p" << FMT_A_S(region_pos, write_block_size) << 
" F    f" << FMT_A_C(free_bytes, free_space.size()));
 
  326 void wbtl_file::check_corruption(offset_type region_pos, offset_type region_size,
 
  327                                  sortseq::iterator pred, sortseq::iterator succ)
 
  329     if (pred != free_space.end())
 
  331         if (pred->first <= region_pos && pred->first + pred->second > region_pos)
 
  333             STXXL_THROW(bad_ext_alloc, 
"Error: double deallocation of external memory " <<
 
  334                         "System info: P " << pred->first << 
" " << pred->second << 
" " << region_pos);
 
  337     if (succ != free_space.end())
 
  339         if (region_pos <= succ->first && region_pos + region_size > succ->first)
 
  341             STXXL_THROW(bad_ext_alloc, 
"Error: double deallocation of external memory " 
  342                         << 
"System info: S " << region_pos << 
" " << region_size << 
" " << succ->first);
 
  347 const char* wbtl_file::io_type()
 const 
  354 #endif  // #if STXXL_HAVE_WBTL_FILE 
void * malloc(size_t size)
#define STXXL_THROW(exception_type, error_message)
Throws exception_type with "Error in [function] : [error_message]". 
counting_ptr< request > request_ptr
A reference counting pointer for request. 
virtual void wait(bool measure_time=true)=0
Suspends calling thread until completion of the request. 
#define STXXL_BEGIN_NAMESPACE
void STXXL_UNUSED(const U &)
#define STXXL_THROW_ERRNO(exception_type, error_message)
Throws exception_type with "Error in [function] : [error_message] : [errno message]". 
static const size_t bytes
number of bytes in uint_pair 
#define _STXXL_FORCE_SEQUENTIAL
void discard(StreamAlgorithm &in)
Reads stream content and discards it. Useful where you do not need the processed stream anymore...
#define STXXL_END_NAMESPACE