STXXL  1.4-dev
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
wbtl_file.cpp
Go to the documentation of this file.
1 /***************************************************************************
2  * lib/io/wbtl_file.cpp
3  *
4  * a write-buffered-translation-layer pseudo file
5  *
6  * Part of the STXXL. See http://stxxl.sourceforge.net
7  *
8  * Copyright (C) 2008-2009 Andreas Beckmann <[email protected]>
9  * Copyright (C) 2009 Johannes Singler <[email protected]>
10  *
11  * Distributed under the Boost Software License, Version 1.0.
12  * (See accompanying file LICENSE_1_0.txt or copy at
13  * http://www.boost.org/LICENSE_1_0.txt)
14  **************************************************************************/
15 
18 
19 #if STXXL_HAVE_WBTL_FILE
20 
21 #include <algorithm>
22 #include <iomanip>
23 #include <stxxl/bits/io/io.h>
24 #include <stxxl/bits/parallel.h>
25 #include <stxxl/aligned_alloc>
26 
27 #ifndef STXXL_VERBOSE_WBTL
28 #define STXXL_VERBOSE_WBTL STXXL_VERBOSE2
29 #endif
30 
32 
33 wbtl_file::wbtl_file(
34  file* backend_file,
35  size_type write_buffer_size,
36  int write_buffers,
37  int queue_id, int allocator_id)
38  : disk_queued_file(queue_id, allocator_id), storage(backend_file),
39  sz(0), write_block_size(write_buffer_size),
40  free_bytes(0), curbuf(1), curpos(write_block_size)
41 {
42  STXXL_UNUSED(write_buffers);
43  assert(write_buffers == 2); // currently hardcoded
44  write_buffer[0] = static_cast<char*>(stxxl::aligned_alloc<STXXL_BLOCK_ALIGN>(write_block_size));
45  write_buffer[1] = static_cast<char*>(stxxl::aligned_alloc<STXXL_BLOCK_ALIGN>(write_block_size));
46  buffer_address[0] = offset_type(-1);
47  buffer_address[1] = offset_type(-1);
48 }
49 
50 wbtl_file::~wbtl_file()
51 {
52  stxxl::aligned_dealloc<STXXL_BLOCK_ALIGN>(write_buffer[1]);
53  stxxl::aligned_dealloc<STXXL_BLOCK_ALIGN>(write_buffer[0]);
54  delete storage;
55  storage = 0;
56 }
57 
58 void wbtl_file::serve(void* buffer, offset_type offset, size_type bytes,
59  request::request_type type)
60 {
61  if (type == request::READ)
62  {
63  //stats::scoped_read_timer read_timer(size());
64  sread(buffer, offset, bytes);
65  }
66  else
67  {
68  //stats::scoped_write_timer write_timer(size());
69  swrite(buffer, offset, bytes);
70  }
71 }
72 
73 void wbtl_file::lock()
74 {
75  storage->lock();
76 }
77 
78 wbtl_file::offset_type wbtl_file::size()
79 {
80  return sz;
81 }
82 
83 void wbtl_file::set_size(offset_type newsize)
84 {
85  scoped_mutex_lock mapping_lock(mapping_mutex);
86  assert(sz <= newsize); // may not shrink
87  if (sz < newsize) {
88  _add_free_region(sz, newsize - sz);
89  storage->set_size(newsize);
90  sz = newsize;
91  assert(sz == storage->size());
92  }
93 }
94 
95 #define FMT_A_S(_addr_, _size_) "0x" << std::hex << std::setfill('0') << std::setw(8) << (_addr_) << "/0x" << std::setw(8) << (_size_)
96  #define FMT_A_C(_addr_, _size_) "0x" << std::setw(8) << (_addr_) << "(" << std::dec << (_size_) << ")"
97  #define FMT_A(_addr_) "0x" << std::setw(8) << (_addr_)
98 
99 // logical address
100 void wbtl_file::discard(offset_type offset, offset_type size)
101 {
102  scoped_mutex_lock mapping_lock(mapping_mutex);
103  sortseq::iterator physical = address_mapping.find(offset);
104  STXXL_VERBOSE_WBTL("wbtl:discard l" << FMT_A_S(offset, size) << " @ p" << FMT_A(physical != address_mapping.end() ? physical->second : 0xffffffff));
105  if (physical == address_mapping.end()) {
106  // could be OK if the block was never written ...
107  //STXXL_ERRMSG("discard: mapping not found: " << FMT_A_S(offset, size) << " ==> " << "???");
108  }
109  else {
110  offset_type physical_offset = physical->second;
111  address_mapping.erase(physical);
112  _add_free_region(physical_offset, size);
113  place_map::iterator reverse = reverse_mapping.find(physical_offset);
114  if (reverse == reverse_mapping.end()) {
115  STXXL_ERRMSG("discard: reverse mapping not found: " << FMT_A_S(offset, size) << " ==> " << "???");
116  }
117  else {
118  assert(offset == (reverse->second).first);
119  reverse_mapping.erase(reverse);
120  }
121  storage->discard(physical_offset, size);
122  }
123 }
124 
125 // physical address
126 void wbtl_file::_add_free_region(offset_type offset, offset_type size)
127 {
128  // mapping_lock has to be aquired by caller
129  STXXL_VERBOSE_WBTL("wbtl:addfre p" << FMT_A_S(offset, size) << " F <= f" << FMT_A_C(free_bytes, free_space.size()));
130  offset_type region_pos = offset;
131  offset_type region_size = size;
132  if (!free_space.empty())
133  {
134  sortseq::iterator succ = free_space.upper_bound(region_pos);
135  sortseq::iterator pred = succ;
136  pred--;
137  check_corruption(region_pos, region_size, pred, succ);
138  if (succ == free_space.end())
139  {
140  if (pred == free_space.end())
141  {
142  //dump();
143  assert(pred != free_space.end());
144  }
145  if ((*pred).first + (*pred).second == region_pos)
146  {
147  // coalesce with predecessor
148  region_size += (*pred).second;
149  region_pos = (*pred).first;
150  free_space.erase(pred);
151  }
152  }
153  else
154  {
155  if (free_space.size() > 1)
156  {
157  bool succ_is_not_the_first = (succ != free_space.begin());
158  if ((*succ).first == region_pos + region_size)
159  {
160  // coalesce with successor
161  region_size += (*succ).second;
162  free_space.erase(succ);
163  }
164  if (succ_is_not_the_first)
165  {
166  if (pred == free_space.end())
167  {
168  //dump();
169  assert(pred != free_space.end());
170  }
171  if ((*pred).first + (*pred).second == region_pos)
172  {
173  // coalesce with predecessor
174  region_size += (*pred).second;
175  region_pos = (*pred).first;
176  free_space.erase(pred);
177  }
178  }
179  }
180  else
181  {
182  if ((*succ).first == region_pos + region_size)
183  {
184  // coalesce with successor
185  region_size += (*succ).second;
186  free_space.erase(succ);
187  }
188  }
189  }
190  }
191 
192  free_space[region_pos] = region_size;
193  free_bytes += size;
194  STXXL_VERBOSE_WBTL("wbtl:free p" << FMT_A_S(region_pos, region_size) << " F => f" << FMT_A_C(free_bytes, free_space.size()));
195 }
196 
197 void wbtl_file::sread(void* buffer, offset_type offset, size_type bytes)
198 {
199  scoped_mutex_lock buffer_lock(buffer_mutex);
200  int cached = -1;
201  offset_type physical_offset;
202  // map logical to physical address
203  {
204  scoped_mutex_lock mapping_lock(mapping_mutex);
205  sortseq::iterator physical = address_mapping.find(offset);
206  if (physical == address_mapping.end()) {
207  STXXL_ERRMSG("wbtl_read: mapping not found: " << FMT_A_S(offset, bytes) << " ==> " << "???");
208  //STXXL_THROW_ERRNO(io_error, "wbtl_read of unmapped memory");
209  physical_offset = 0xffffffff;
210  }
211  else {
212  physical_offset = physical->second;
213  }
214  }
215 
216  if (buffer_address[curbuf] <= physical_offset &&
217  physical_offset < buffer_address[curbuf] + write_block_size)
218  {
219  // block is in current write buffer
220  assert(physical_offset + bytes <= buffer_address[curbuf] + write_block_size);
221  memcpy(buffer, write_buffer[curbuf] + (physical_offset - buffer_address[curbuf]), bytes);
222  stats::get_instance()->read_cached(bytes);
223  cached = curbuf;
224  }
225  else if (buffer_address[1 - curbuf] <= physical_offset &&
226  physical_offset < buffer_address[1 - curbuf] + write_block_size)
227  {
228  // block is in previous write buffer
229  assert(physical_offset + bytes <= buffer_address[1 - curbuf] + write_block_size);
230  memcpy(buffer, write_buffer[1 - curbuf] + (physical_offset - buffer_address[1 - curbuf]), bytes);
231  stats::get_instance()->read_cached(bytes);
232  cached = curbuf;
233  }
234  else if (physical_offset == 0xffffffff) {
235  // block was deleted or never written before
236  char* uninitialized = (char*)malloc(sizeof(char));
237  memset(buffer, *uninitialized, bytes);
238  free(uninitialized);
239  }
240  else
241  {
242  // block is not cached
243  request_ptr req = storage->aread(buffer, physical_offset, bytes);
244  req->wait(false);
245  }
246  STXXL_VERBOSE_WBTL("wbtl:sread l" << FMT_A_S(offset, bytes) << " @ p" << FMT_A(physical_offset) << " " << std::dec << cached);
247  STXXL_UNUSED(cached);
248 }
249 
250 void wbtl_file::swrite(void* buffer, offset_type offset, size_type bytes)
251 {
252  scoped_mutex_lock buffer_lock(buffer_mutex);
253  // is the block already mapped?
254  {
255  scoped_mutex_lock mapping_lock(mapping_mutex);
256  sortseq::iterator physical = address_mapping.find(offset);
257  STXXL_VERBOSE_WBTL("wbtl:swrite l" << FMT_A_S(offset, bytes) << " @ <= p" <<
258  FMT_A_C(physical != address_mapping.end() ? physical->second : 0xffffffff, address_mapping.size()));
259  if (physical != address_mapping.end()) {
260  mapping_lock.unlock();
261  // FIXME: special case if we can replace it in the current writing block
262  discard(offset, bytes);
263  }
264  }
265 
266  if (bytes > write_block_size - curpos)
267  {
268  // not enough space in the current write buffer
269 
270  if (buffer_address[curbuf] != offset_type(-1)) {
271  STXXL_VERBOSE_WBTL("wbtl:w2disk p" << FMT_A_S(buffer_address[curbuf], write_block_size));
272 
273  // mark remaining part as free
274  if (curpos < write_block_size)
275  _add_free_region(buffer_address[curbuf] + curpos, write_block_size - curpos);
276 
277  if (backend_request.get()) {
278  backend_request->wait(false);
279  }
280 
281  backend_request = storage->awrite(write_buffer[curbuf], buffer_address[curbuf], write_block_size);
282  }
283 
284  curbuf = 1 - curbuf;
285 
286  buffer_address[curbuf] = get_next_write_block();
287  curpos = 0;
288  }
289  assert(bytes <= write_block_size - curpos);
290 
291  // write block into buffer
292  memcpy(write_buffer[curbuf] + curpos, buffer, bytes);
293  stats::get_instance()->write_cached(bytes);
294 
295  scoped_mutex_lock mapping_lock(mapping_mutex);
296  address_mapping[offset] = buffer_address[curbuf] + curpos;
297  reverse_mapping[buffer_address[curbuf] + curpos] = place(offset, bytes);
298  STXXL_VERBOSE_WBTL("wbtl:swrite l" << FMT_A_S(offset, bytes) << " @ => p" << FMT_A_C(buffer_address[curbuf] + curpos, address_mapping.size()));
299  curpos += bytes;
300 }
301 
302 wbtl_file::offset_type wbtl_file::get_next_write_block()
303 {
304  // mapping_lock has to be aquired by caller
305  sortseq::iterator space =
306  std::find_if(free_space.begin(), free_space.end(),
307  bind2nd(FirstFit(), write_block_size) _STXXL_FORCE_SEQUENTIAL);
308 
309  if (space != free_space.end())
310  {
311  offset_type region_pos = (*space).first;
312  offset_type region_size = (*space).second;
313  free_space.erase(space);
314  if (region_size > write_block_size)
315  free_space[region_pos + write_block_size] = region_size - write_block_size;
316 
317  free_bytes -= write_block_size;
318 
319  STXXL_VERBOSE_WBTL("wbtl:nextwb p" << FMT_A_S(region_pos, write_block_size) << " F f" << FMT_A_C(free_bytes, free_space.size()));
320  return region_pos;
321  }
322 
323  STXXL_THROW_ERRNO(io_error, "OutOfSpace, probably fragmented");
324 }
325 
326 void wbtl_file::check_corruption(offset_type region_pos, offset_type region_size,
327  sortseq::iterator pred, sortseq::iterator succ)
328 {
329  if (pred != free_space.end())
330  {
331  if (pred->first <= region_pos && pred->first + pred->second > region_pos)
332  {
333  STXXL_THROW(bad_ext_alloc, "Error: double deallocation of external memory " <<
334  "System info: P " << pred->first << " " << pred->second << " " << region_pos);
335  }
336  }
337  if (succ != free_space.end())
338  {
339  if (region_pos <= succ->first && region_pos + region_size > succ->first)
340  {
341  STXXL_THROW(bad_ext_alloc, "Error: double deallocation of external memory "
342  << "System info: S " << region_pos << " " << region_size << " " << succ->first);
343  }
344  }
345 }
346 
347 const char* wbtl_file::io_type() const
348 {
349  return "wbtl";
350 }
351 
353 
354 #endif // #if STXXL_HAVE_WBTL_FILE
355 // vim: et:ts=4:sw=4
void * malloc(size_t size)
#define STXXL_THROW(exception_type, error_message)
Throws exception_type with &quot;Error in [function] : [error_message]&quot;.
counting_ptr< request > request_ptr
A reference counting pointer for request.
Definition: request.h:113
virtual void wait(bool measure_time=true)=0
Suspends calling thread until completion of the request.
#define STXXL_BEGIN_NAMESPACE
Definition: namespace.h:16
void STXXL_UNUSED(const U &)
Definition: unused.h:22
void free(void *ptr)
#define STXXL_ERRMSG(x)
Definition: verbose.h:94
#define STXXL_THROW_ERRNO(exception_type, error_message)
Throws exception_type with &quot;Error in [function] : [error_message] : [errno message]&quot;.
static const size_t bytes
number of bytes in uint_pair
Definition: uint_types.h:96
#define _STXXL_FORCE_SEQUENTIAL
Definition: parallel.h:34
void discard(StreamAlgorithm &in)
Reads stream content and discards it. Useful where you do not need the processed stream anymore...
Definition: stream.h:640
#define STXXL_END_NAMESPACE
Definition: namespace.h:17