19 #if STXXL_HAVE_WBTL_FILE
27 #ifndef STXXL_VERBOSE_WBTL
28 #define STXXL_VERBOSE_WBTL STXXL_VERBOSE2
37 size_type write_buffer_size,
39 int queue_id,
int allocator_id) :
40 disk_queued_file(queue_id, allocator_id), storage(backend_file), sz(0), write_block_size(write_buffer_size),
41 free_bytes(0), curbuf(1), curpos(write_block_size)
44 assert(write_buffers == 2);
45 write_buffer[0] =
static_cast<char*
>(stxxl::aligned_alloc<BLOCK_ALIGN>(write_block_size));
46 write_buffer[1] =
static_cast<char*
>(stxxl::aligned_alloc<BLOCK_ALIGN>(write_block_size));
47 buffer_address[0] = offset_type(-1);
48 buffer_address[1] = offset_type(-1);
51 wbtl_file::~wbtl_file()
53 stxxl::aligned_dealloc<BLOCK_ALIGN>(write_buffer[1]);
54 stxxl::aligned_dealloc<BLOCK_ALIGN>(write_buffer[0]);
59 void wbtl_file::serve(
const request* req)
throw (io_error)
61 assert(req->get_file() ==
this);
62 offset_type offset = req->get_offset();
63 void* buffer = req->get_buffer();
64 size_type
bytes = req->get_size();
65 request::request_type type = req->get_type();
67 if (type == request::READ)
70 sread(buffer, offset, bytes);
75 swrite(buffer, offset, bytes);
79 void wbtl_file::lock()
84 wbtl_file::offset_type wbtl_file::size()
89 void wbtl_file::set_size(offset_type newsize)
91 scoped_mutex_lock mapping_lock(mapping_mutex);
92 assert(sz <= newsize);
94 _add_free_region(sz, newsize - sz);
95 storage->set_size(newsize);
97 assert(sz == storage->size());
101 #define FMT_A_S(_addr_, _size_) "0x" << std::hex << std::setfill('0') << std::setw(8) << (_addr_) << "/0x" << std::setw(8) << (_size_)
102 #define FMT_A_C(_addr_, _size_) "0x" << std::setw(8) << (_addr_) << "(" << std::dec << (_size_) << ")"
103 #define FMT_A(_addr_) "0x" << std::setw(8) << (_addr_)
108 scoped_mutex_lock mapping_lock(mapping_mutex);
109 sortseq::iterator physical = address_mapping.find(offset);
110 STXXL_VERBOSE_WBTL(
"wbtl:discard l" << FMT_A_S(offset, size) <<
" @ p" << FMT_A(physical != address_mapping.end() ? physical->second : 0xffffffff));
111 if (physical == address_mapping.end()) {
115 offset_type physical_offset = physical->second;
116 address_mapping.erase(physical);
117 _add_free_region(physical_offset, size);
118 place_map::iterator reverse = reverse_mapping.find(physical_offset);
119 if (reverse == reverse_mapping.end()) {
120 STXXL_ERRMSG(
"discard: reverse mapping not found: " << FMT_A_S(offset, size) <<
" ==> " <<
"???");
122 assert(offset == (reverse->second).first);
123 reverse_mapping.erase(reverse);
125 storage->discard(physical_offset, size);
130 void wbtl_file::_add_free_region(offset_type offset, offset_type size)
133 STXXL_VERBOSE_WBTL(
"wbtl:addfre p" << FMT_A_S(offset, size) <<
" F <= f" << FMT_A_C(free_bytes, free_space.size()));
134 offset_type region_pos = offset;
135 offset_type region_size = size;
136 if (!free_space.empty())
138 sortseq::iterator succ = free_space.upper_bound(region_pos);
139 sortseq::iterator pred = succ;
141 check_corruption(region_pos, region_size, pred, succ);
142 if (succ == free_space.end())
144 if (pred == free_space.end())
147 assert(pred != free_space.end());
149 if ((*pred).first + (*pred).second == region_pos)
152 region_size += (*pred).second;
153 region_pos = (*pred).first;
154 free_space.erase(pred);
159 if (free_space.size() > 1)
161 bool succ_is_not_the_first = (succ != free_space.begin());
162 if ((*succ).first == region_pos + region_size)
165 region_size += (*succ).second;
166 free_space.erase(succ);
168 if (succ_is_not_the_first)
170 if (pred == free_space.end())
173 assert(pred != free_space.end());
175 if ((*pred).first + (*pred).second == region_pos)
178 region_size += (*pred).second;
179 region_pos = (*pred).first;
180 free_space.erase(pred);
186 if ((*succ).first == region_pos + region_size)
189 region_size += (*succ).second;
190 free_space.erase(succ);
196 free_space[region_pos] = region_size;
198 STXXL_VERBOSE_WBTL(
"wbtl:free p" << FMT_A_S(region_pos, region_size) <<
" F => f" << FMT_A_C(free_bytes, free_space.size()));
201 void wbtl_file::sread(
void* buffer, offset_type offset, size_type bytes)
203 scoped_mutex_lock buffer_lock(buffer_mutex);
205 offset_type physical_offset;
208 scoped_mutex_lock mapping_lock(mapping_mutex);
209 sortseq::iterator physical = address_mapping.find(offset);
210 if (physical == address_mapping.end()) {
211 STXXL_ERRMSG(
"wbtl_read: mapping not found: " << FMT_A_S(offset, bytes) <<
" ==> " <<
"???");
213 physical_offset = 0xffffffff;
215 physical_offset = physical->second;
219 if (buffer_address[curbuf] <= physical_offset &&
220 physical_offset < buffer_address[curbuf] + write_block_size)
223 assert(physical_offset + bytes <= buffer_address[curbuf] + write_block_size);
224 memcpy(buffer, write_buffer[curbuf] + (physical_offset - buffer_address[curbuf]), bytes);
225 stats::get_instance()->read_cached(bytes);
228 else if (buffer_address[1 - curbuf] <= physical_offset &&
229 physical_offset < buffer_address[1 - curbuf] + write_block_size)
232 assert(physical_offset + bytes <= buffer_address[1 - curbuf] + write_block_size);
233 memcpy(buffer, write_buffer[1 - curbuf] + (physical_offset - buffer_address[1 - curbuf]), bytes);
234 stats::get_instance()->read_cached(bytes);
237 else if (physical_offset == 0xffffffff) {
239 char* uninitialized = (
char*)malloc(
sizeof(
char));
240 memset(buffer, *uninitialized, bytes);
246 request_ptr req = storage->aread(buffer, physical_offset, bytes, default_completion_handler());
249 STXXL_VERBOSE_WBTL(
"wbtl:sread l" << FMT_A_S(offset, bytes) <<
" @ p" << FMT_A(physical_offset) <<
" " << std::dec << cached);
253 void wbtl_file::swrite(
void* buffer, offset_type offset, size_type bytes)
255 scoped_mutex_lock buffer_lock(buffer_mutex);
258 scoped_mutex_lock mapping_lock(mapping_mutex);
259 sortseq::iterator physical = address_mapping.find(offset);
260 STXXL_VERBOSE_WBTL(
"wbtl:swrite l" << FMT_A_S(offset, bytes) <<
" @ <= p" <<
261 FMT_A_C(physical != address_mapping.end() ? physical->second : 0xffffffff, address_mapping.size()));
262 if (physical != address_mapping.end()) {
263 mapping_lock.unlock();
269 if (bytes > write_block_size - curpos)
273 if (buffer_address[curbuf] != offset_type(-1)) {
274 STXXL_VERBOSE_WBTL(
"wbtl:w2disk p" << FMT_A_S(buffer_address[curbuf], write_block_size));
277 if (curpos < write_block_size)
278 _add_free_region(buffer_address[curbuf] + curpos, write_block_size - curpos);
280 if (backend_request.get()) {
281 backend_request->wait(
false);
284 backend_request = storage->awrite(write_buffer[curbuf], buffer_address[curbuf], write_block_size, default_completion_handler());
289 buffer_address[curbuf] = get_next_write_block();
292 assert(bytes <= write_block_size - curpos);
295 memcpy(write_buffer[curbuf] + curpos, buffer, bytes);
296 stats::get_instance()->write_cached(bytes);
298 scoped_mutex_lock mapping_lock(mapping_mutex);
299 address_mapping[offset] = buffer_address[curbuf] + curpos;
300 reverse_mapping[buffer_address[curbuf] + curpos] = place(offset, bytes);
301 STXXL_VERBOSE_WBTL(
"wbtl:swrite l" << FMT_A_S(offset, bytes) <<
" @ => p" << FMT_A_C(buffer_address[curbuf] + curpos, address_mapping.size()));
305 wbtl_file::offset_type wbtl_file::get_next_write_block()
308 sortseq::iterator space =
309 std::find_if(free_space.begin(), free_space.end(),
312 if (space != free_space.end())
314 offset_type region_pos = (*space).first;
315 offset_type region_size = (*space).second;
316 free_space.erase(space);
317 if (region_size > write_block_size)
318 free_space[region_pos + write_block_size] = region_size - write_block_size;
320 free_bytes -= write_block_size;
322 STXXL_VERBOSE_WBTL(
"wbtl:nextwb p" << FMT_A_S(region_pos, write_block_size) <<
" F f" << FMT_A_C(free_bytes, free_space.size()));
329 void wbtl_file::check_corruption(offset_type region_pos, offset_type region_size,
330 sortseq::iterator pred, sortseq::iterator succ)
332 if (pred != free_space.end())
334 if (pred->first <= region_pos && pred->first + pred->second > region_pos)
336 STXXL_THROW(bad_ext_alloc,
"Error: double deallocation of external memory " <<
337 "System info: P " << pred->first <<
" " << pred->second <<
" " << region_pos);
340 if (succ != free_space.end())
342 if (region_pos <= succ->first && region_pos + region_size > succ->first)
344 STXXL_THROW(bad_ext_alloc,
"Error: double deallocation of external memory "
345 <<
"System info: S " << region_pos <<
" " << region_size <<
" " << succ->first);
350 const char* wbtl_file::io_type()
const
357 #endif // #if STXXL_HAVE_WBTL_FILE
static const int bytes
number of bytes in uint_pair
counting_ptr< request > request_ptr
A reference counting pointer for request.
#define STXXL_THROW(exception_type, error_message)
Throws exception_type with "Error in [function] : [error_message]".
virtual void wait(bool measure_time=true)=0
Suspends calling thread until completion of the request.
#define STXXL_BEGIN_NAMESPACE
void STXXL_UNUSED(const U &)
#define STXXL_THROW_ERRNO(exception_type, error_message)
Throws exception_type with "Error in [function] : [error_message] : [errno message]".
void discard(StreamAlgorithm_ &in)
Reads stream content and discards it. Useful where you do not need the processed stream anymore...
#define _STXXL_FORCE_SEQUENTIAL
#define STXXL_END_NAMESPACE