STXXL  1.4.0
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
wbtl_file.cpp
Go to the documentation of this file.
1 /***************************************************************************
2  * lib/io/wbtl_file.cpp
3  *
4  * a write-buffered-translation-layer pseudo file
5  *
6  * Part of the STXXL. See http://stxxl.sourceforge.net
7  *
8  * Copyright (C) 2008-2009 Andreas Beckmann <[email protected]>
9  * Copyright (C) 2009 Johannes Singler <[email protected]>
10  *
11  * Distributed under the Boost Software License, Version 1.0.
12  * (See accompanying file LICENSE_1_0.txt or copy at
13  * http://www.boost.org/LICENSE_1_0.txt)
14  **************************************************************************/
15 
18 
19 #if STXXL_HAVE_WBTL_FILE
20 
21 #include <algorithm>
22 #include <iomanip>
23 #include <stxxl/bits/io/io.h>
24 #include <stxxl/bits/parallel.h>
25 #include <stxxl/aligned_alloc>
26 
27 #ifndef STXXL_VERBOSE_WBTL
28 #define STXXL_VERBOSE_WBTL STXXL_VERBOSE2
29 #endif
30 
31 
33 
34 
35 wbtl_file::wbtl_file(
36  file* backend_file,
37  size_type write_buffer_size,
38  int write_buffers,
39  int queue_id, int allocator_id) :
40  disk_queued_file(queue_id, allocator_id), storage(backend_file), sz(0), write_block_size(write_buffer_size),
41  free_bytes(0), curbuf(1), curpos(write_block_size)
42 {
43  STXXL_UNUSED(write_buffers);
44  assert(write_buffers == 2); // currently hardcoded
45  write_buffer[0] = static_cast<char*>(stxxl::aligned_alloc<BLOCK_ALIGN>(write_block_size));
46  write_buffer[1] = static_cast<char*>(stxxl::aligned_alloc<BLOCK_ALIGN>(write_block_size));
47  buffer_address[0] = offset_type(-1);
48  buffer_address[1] = offset_type(-1);
49 }
50 
51 wbtl_file::~wbtl_file()
52 {
53  stxxl::aligned_dealloc<BLOCK_ALIGN>(write_buffer[1]);
54  stxxl::aligned_dealloc<BLOCK_ALIGN>(write_buffer[0]);
55  delete storage;
56  storage = 0;
57 }
58 
59 void wbtl_file::serve(const request* req) throw (io_error)
60 {
61  assert(req->get_file() == this);
62  offset_type offset = req->get_offset();
63  void* buffer = req->get_buffer();
64  size_type bytes = req->get_size();
65  request::request_type type = req->get_type();
66 
67  if (type == request::READ)
68  {
69  //stats::scoped_read_timer read_timer(size());
70  sread(buffer, offset, bytes);
71  }
72  else
73  {
74  //stats::scoped_write_timer write_timer(size());
75  swrite(buffer, offset, bytes);
76  }
77 }
78 
79 void wbtl_file::lock()
80 {
81  storage->lock();
82 }
83 
84 wbtl_file::offset_type wbtl_file::size()
85 {
86  return sz;
87 }
88 
89 void wbtl_file::set_size(offset_type newsize)
90 {
91  scoped_mutex_lock mapping_lock(mapping_mutex);
92  assert(sz <= newsize); // may not shrink
93  if (sz < newsize) {
94  _add_free_region(sz, newsize - sz);
95  storage->set_size(newsize);
96  sz = newsize;
97  assert(sz == storage->size());
98  }
99 }
100 
101 #define FMT_A_S(_addr_, _size_) "0x" << std::hex << std::setfill('0') << std::setw(8) << (_addr_) << "/0x" << std::setw(8) << (_size_)
102  #define FMT_A_C(_addr_, _size_) "0x" << std::setw(8) << (_addr_) << "(" << std::dec << (_size_) << ")"
103  #define FMT_A(_addr_) "0x" << std::setw(8) << (_addr_)
104 
105 // logical address
106 void wbtl_file::discard(offset_type offset, offset_type size)
107 {
108  scoped_mutex_lock mapping_lock(mapping_mutex);
109  sortseq::iterator physical = address_mapping.find(offset);
110  STXXL_VERBOSE_WBTL("wbtl:discard l" << FMT_A_S(offset, size) << " @ p" << FMT_A(physical != address_mapping.end() ? physical->second : 0xffffffff));
111  if (physical == address_mapping.end()) {
112  // could be OK if the block was never written ...
113  //STXXL_ERRMSG("discard: mapping not found: " << FMT_A_S(offset, size) << " ==> " << "???");
114  } else {
115  offset_type physical_offset = physical->second;
116  address_mapping.erase(physical);
117  _add_free_region(physical_offset, size);
118  place_map::iterator reverse = reverse_mapping.find(physical_offset);
119  if (reverse == reverse_mapping.end()) {
120  STXXL_ERRMSG("discard: reverse mapping not found: " << FMT_A_S(offset, size) << " ==> " << "???");
121  } else {
122  assert(offset == (reverse->second).first);
123  reverse_mapping.erase(reverse);
124  }
125  storage->discard(physical_offset, size);
126  }
127 }
128 
129 // physical address
130 void wbtl_file::_add_free_region(offset_type offset, offset_type size)
131 {
132  // mapping_lock has to be aquired by caller
133  STXXL_VERBOSE_WBTL("wbtl:addfre p" << FMT_A_S(offset, size) << " F <= f" << FMT_A_C(free_bytes, free_space.size()));
134  offset_type region_pos = offset;
135  offset_type region_size = size;
136  if (!free_space.empty())
137  {
138  sortseq::iterator succ = free_space.upper_bound(region_pos);
139  sortseq::iterator pred = succ;
140  pred--;
141  check_corruption(region_pos, region_size, pred, succ);
142  if (succ == free_space.end())
143  {
144  if (pred == free_space.end())
145  {
146  //dump();
147  assert(pred != free_space.end());
148  }
149  if ((*pred).first + (*pred).second == region_pos)
150  {
151  // coalesce with predecessor
152  region_size += (*pred).second;
153  region_pos = (*pred).first;
154  free_space.erase(pred);
155  }
156  }
157  else
158  {
159  if (free_space.size() > 1)
160  {
161  bool succ_is_not_the_first = (succ != free_space.begin());
162  if ((*succ).first == region_pos + region_size)
163  {
164  // coalesce with successor
165  region_size += (*succ).second;
166  free_space.erase(succ);
167  }
168  if (succ_is_not_the_first)
169  {
170  if (pred == free_space.end())
171  {
172  //dump();
173  assert(pred != free_space.end());
174  }
175  if ((*pred).first + (*pred).second == region_pos)
176  {
177  // coalesce with predecessor
178  region_size += (*pred).second;
179  region_pos = (*pred).first;
180  free_space.erase(pred);
181  }
182  }
183  }
184  else
185  {
186  if ((*succ).first == region_pos + region_size)
187  {
188  // coalesce with successor
189  region_size += (*succ).second;
190  free_space.erase(succ);
191  }
192  }
193  }
194  }
195 
196  free_space[region_pos] = region_size;
197  free_bytes += size;
198  STXXL_VERBOSE_WBTL("wbtl:free p" << FMT_A_S(region_pos, region_size) << " F => f" << FMT_A_C(free_bytes, free_space.size()));
199 }
200 
201 void wbtl_file::sread(void* buffer, offset_type offset, size_type bytes)
202 {
203  scoped_mutex_lock buffer_lock(buffer_mutex);
204  int cached = -1;
205  offset_type physical_offset;
206  // map logical to physical address
207  {
208  scoped_mutex_lock mapping_lock(mapping_mutex);
209  sortseq::iterator physical = address_mapping.find(offset);
210  if (physical == address_mapping.end()) {
211  STXXL_ERRMSG("wbtl_read: mapping not found: " << FMT_A_S(offset, bytes) << " ==> " << "???");
212  //STXXL_THROW_ERRNO(io_error, "wbtl_read of unmapped memory");
213  physical_offset = 0xffffffff;
214  } else {
215  physical_offset = physical->second;
216  }
217  }
218 
219  if (buffer_address[curbuf] <= physical_offset &&
220  physical_offset < buffer_address[curbuf] + write_block_size)
221  {
222  // block is in current write buffer
223  assert(physical_offset + bytes <= buffer_address[curbuf] + write_block_size);
224  memcpy(buffer, write_buffer[curbuf] + (physical_offset - buffer_address[curbuf]), bytes);
225  stats::get_instance()->read_cached(bytes);
226  cached = curbuf;
227  }
228  else if (buffer_address[1 - curbuf] <= physical_offset &&
229  physical_offset < buffer_address[1 - curbuf] + write_block_size)
230  {
231  // block is in previous write buffer
232  assert(physical_offset + bytes <= buffer_address[1 - curbuf] + write_block_size);
233  memcpy(buffer, write_buffer[1 - curbuf] + (physical_offset - buffer_address[1 - curbuf]), bytes);
234  stats::get_instance()->read_cached(bytes);
235  cached = curbuf;
236  }
237  else if (physical_offset == 0xffffffff) {
238  // block was deleted or never written before
239  char* uninitialized = (char*)malloc(sizeof(char));
240  memset(buffer, *uninitialized, bytes);
241  free(uninitialized);
242  }
243  else
244  {
245  // block is not cached
246  request_ptr req = storage->aread(buffer, physical_offset, bytes, default_completion_handler());
247  req->wait(false);
248  }
249  STXXL_VERBOSE_WBTL("wbtl:sread l" << FMT_A_S(offset, bytes) << " @ p" << FMT_A(physical_offset) << " " << std::dec << cached);
250  STXXL_UNUSED(cached);
251 }
252 
253 void wbtl_file::swrite(void* buffer, offset_type offset, size_type bytes)
254 {
255  scoped_mutex_lock buffer_lock(buffer_mutex);
256  // is the block already mapped?
257  {
258  scoped_mutex_lock mapping_lock(mapping_mutex);
259  sortseq::iterator physical = address_mapping.find(offset);
260  STXXL_VERBOSE_WBTL("wbtl:swrite l" << FMT_A_S(offset, bytes) << " @ <= p" <<
261  FMT_A_C(physical != address_mapping.end() ? physical->second : 0xffffffff, address_mapping.size()));
262  if (physical != address_mapping.end()) {
263  mapping_lock.unlock();
264  // FIXME: special case if we can replace it in the current writing block
265  discard(offset, bytes);
266  }
267  }
268 
269  if (bytes > write_block_size - curpos)
270  {
271  // not enough space in the current write buffer
272 
273  if (buffer_address[curbuf] != offset_type(-1)) {
274  STXXL_VERBOSE_WBTL("wbtl:w2disk p" << FMT_A_S(buffer_address[curbuf], write_block_size));
275 
276  // mark remaining part as free
277  if (curpos < write_block_size)
278  _add_free_region(buffer_address[curbuf] + curpos, write_block_size - curpos);
279 
280  if (backend_request.get()) {
281  backend_request->wait(false);
282  }
283 
284  backend_request = storage->awrite(write_buffer[curbuf], buffer_address[curbuf], write_block_size, default_completion_handler());
285  }
286 
287  curbuf = 1 - curbuf;
288 
289  buffer_address[curbuf] = get_next_write_block();
290  curpos = 0;
291  }
292  assert(bytes <= write_block_size - curpos);
293 
294  // write block into buffer
295  memcpy(write_buffer[curbuf] + curpos, buffer, bytes);
296  stats::get_instance()->write_cached(bytes);
297 
298  scoped_mutex_lock mapping_lock(mapping_mutex);
299  address_mapping[offset] = buffer_address[curbuf] + curpos;
300  reverse_mapping[buffer_address[curbuf] + curpos] = place(offset, bytes);
301  STXXL_VERBOSE_WBTL("wbtl:swrite l" << FMT_A_S(offset, bytes) << " @ => p" << FMT_A_C(buffer_address[curbuf] + curpos, address_mapping.size()));
302  curpos += bytes;
303 }
304 
305 wbtl_file::offset_type wbtl_file::get_next_write_block()
306 {
307  // mapping_lock has to be aquired by caller
308  sortseq::iterator space =
309  std::find_if(free_space.begin(), free_space.end(),
310  bind2nd(FirstFit(), write_block_size) _STXXL_FORCE_SEQUENTIAL);
311 
312  if (space != free_space.end())
313  {
314  offset_type region_pos = (*space).first;
315  offset_type region_size = (*space).second;
316  free_space.erase(space);
317  if (region_size > write_block_size)
318  free_space[region_pos + write_block_size] = region_size - write_block_size;
319 
320  free_bytes -= write_block_size;
321 
322  STXXL_VERBOSE_WBTL("wbtl:nextwb p" << FMT_A_S(region_pos, write_block_size) << " F f" << FMT_A_C(free_bytes, free_space.size()));
323  return region_pos;
324  }
325 
326  STXXL_THROW_ERRNO(io_error, "OutOfSpace, probably fragmented");
327 }
328 
329 void wbtl_file::check_corruption(offset_type region_pos, offset_type region_size,
330  sortseq::iterator pred, sortseq::iterator succ)
331 {
332  if (pred != free_space.end())
333  {
334  if (pred->first <= region_pos && pred->first + pred->second > region_pos)
335  {
336  STXXL_THROW(bad_ext_alloc, "Error: double deallocation of external memory " <<
337  "System info: P " << pred->first << " " << pred->second << " " << region_pos);
338  }
339  }
340  if (succ != free_space.end())
341  {
342  if (region_pos <= succ->first && region_pos + region_size > succ->first)
343  {
344  STXXL_THROW(bad_ext_alloc, "Error: double deallocation of external memory "
345  << "System info: S " << region_pos << " " << region_size << " " << succ->first);
346  }
347  }
348 }
349 
350 const char* wbtl_file::io_type() const
351 {
352  return "wbtl";
353 }
354 
356 
357 #endif // #if STXXL_HAVE_WBTL_FILE
358 // vim: et:ts=4:sw=4
static const int bytes
number of bytes in uint_pair
Definition: uint_types.h:99
counting_ptr< request > request_ptr
A reference counting pointer for request.
Definition: request.h:112
#define STXXL_THROW(exception_type, error_message)
Throws exception_type with &quot;Error in [function] : [error_message]&quot;.
virtual void wait(bool measure_time=true)=0
Suspends calling thread until completion of the request.
#define STXXL_BEGIN_NAMESPACE
Definition: namespace.h:16
void STXXL_UNUSED(const U &)
Definition: unused.h:23
#define STXXL_ERRMSG(x)
Definition: verbose.h:79
#define STXXL_THROW_ERRNO(exception_type, error_message)
Throws exception_type with &quot;Error in [function] : [error_message] : [errno message]&quot;.
void discard(StreamAlgorithm_ &in)
Reads stream content and discards it. Useful where you do not need the processed stream anymore...
Definition: stream.h:606
#define _STXXL_FORCE_SEQUENTIAL
Definition: parallel.h:51
#define STXXL_END_NAMESPACE
Definition: namespace.h:17