14 #ifndef STXXL_CONTAINERS_PARALLEL_PRIORITY_QUEUE_HEADER
15 #define STXXL_CONTAINERS_PARALLEL_PRIORITY_QUEUE_HEADER
31 #if __cplusplus >= 201103L
32 #define STXXL_MOVE(T) std::move(T)
34 #define STXXL_MOVE(T) T
69 template <
class ValueType>
102 : m_block_pointers(NULL)
116 : m_block_pointers(block_pointers),
118 m_block_items(block_items)
137 return &(operator * ());
144 const size_t block_index = index / m_block_items;
145 const size_t local_index = index % m_block_items;
147 assert(block_index < m_block_pointers->size());
148 assert((*m_block_pointers)[block_index].first + local_index
149 < (*m_block_pointers)[block_index].second);
151 return *((*m_block_pointers)[block_index].first + local_index);
160 if (
UNLIKELY(m_current == (*m_block_pointers)[m_block_index].second)) {
161 if (m_block_index + 1 < m_block_pointers->size()) {
162 m_current = (*m_block_pointers)[++m_block_index].first;
166 assert(m_block_index + 1 == m_block_pointers->size());
167 m_current = (*m_block_pointers)[m_block_index++].second;
179 if (m_block_index >= m_block_pointers->size()
180 || m_current == (*m_block_pointers)[m_block_index].first) {
182 assert(m_block_index > 0);
183 assert(m_block_index <= m_block_pointers->size());
184 m_current = (*m_block_pointers)[--m_block_index].second - 1;
195 return self_type(m_block_pointers, m_block_items, m_index + addend);
205 return self_type(m_block_pointers, m_block_items, m_index - subtrahend);
213 m_index -= subtrahend;
244 return os <<
"[" << i.
m_index <<
"]";
251 m_block_index = m_index / m_block_items;
252 const size_t local_index = m_index % m_block_items;
254 if (m_block_index < m_block_pointers->size()) {
255 m_current = (*m_block_pointers)[m_block_index].first + local_index;
256 assert(m_current <= (*m_block_pointers)[m_block_index].second);
260 assert(m_block_index == m_block_pointers->size());
261 assert(local_index == 0);
274 template <
class ValueType>
307 : m_values(), m_min_index(min_index), m_level(level),
310 std::swap(m_values, values);
312 m_block_pointers[0] = std::make_pair(&(*m_values.begin()), &(*m_values.begin()) + m_values.size());
347 return m_values[m_min_index];
359 return (m_values.size() - 1);
365 return (m_min_index >= m_values.size());
371 m_min_index = m_values.size();
377 return (m_values.size() - m_min_index);
383 return m_values.size();
402 return int_memory(m_values.capacity());
409 return iterator(&m_block_pointers, capacity(), m_min_index);
416 return iterator(&m_block_pointers, capacity(), capacity());
420 template <
class ExternalArrayType>
461 block_size = BlockSize,
465 static const bool debug =
false;
538 m_num_blocks((size_t)
div_ceil(m_capacity, block_items)),
543 m_bids(m_num_blocks),
544 m_blocks(m_num_blocks, reinterpret_cast<
block_type*>(1)),
545 m_block_pointers(m_num_blocks),
546 m_requests(m_num_blocks, NULL),
547 m_minima(m_num_blocks),
557 m_old_unhinted_block(0)
559 assert(m_capacity > 0);
562 bm->
new_blocks(AllocStrategy(), m_bids.begin(), m_bids.end());
582 m_write_phase(false),
589 m_old_unhinted_block(0)
630 if (m_size == 0)
return;
639 for (
size_t i = block_index; i < end_block_index; ++i) {
645 for (
size_t i = end_block_index; i < m_unhinted_block; ++i) {
646 STXXL_DEBUG(
"ea[" <<
this <<
"]: discarding prefetch hint on"
649 m_requests[i]->cancel();
650 m_requests[i]->wait();
660 block_manager::get_instance()->delete_blocks(i_begin, m_bids.end());
663 for (
size_t i = block_index; i < end_block_index; ++i)
664 assert(m_blocks[i] == NULL);
682 return (m_size == 0);
701 size_t num_blocks =
div_ceil(capacity, block_items);
704 + num_blocks *
sizeof(
typename bid_vector::value_type)
705 + num_blocks *
sizeof(
typename block_vector::value_type)
706 + num_blocks *
sizeof(
typename block_pointers_type::value_type)
707 + num_blocks *
sizeof(
typename request_vector::value_type)
708 + num_blocks *
sizeof(
typename minima_vector::value_type);
714 return int_memory(m_capacity);
720 return (m_end_index - m_index);
729 if (m_end_index % block_items != 0) ++end_block_index;
730 assert(end_block_index <= m_num_blocks);
732 return end_block_index;
738 return (m_index / block_items);
746 return iterator(&m_block_pointers, block_items, m_index);
754 return iterator(&m_block_pointers, block_items, m_end_index);
766 return (get_end_block_index() < m_num_blocks);
773 assert(get_end_block_index() < m_num_blocks);
774 return m_minima[get_end_block_index()];
784 for (
unsigned_type i = block_index; i < end_block_index; ++i) {
785 result = result && block_valid(i);
794 assert(i < m_capacity);
795 const size_t block_index = i / block_items;
796 const size_t local_index = i % block_items;
797 assert(i < m_capacity);
798 assert(block_valid(block_index));
799 return m_blocks[block_index]->elem[local_index];
809 if (write_blocks == 0) write_blocks = 1;
813 if (write_blocks < config::get_instance()->disks_number())
814 write_blocks = config::get_instance()->disks_number();
815 #if STXXL_DEBUG_ASSERTIONS
817 write_blocks = 2 * write_blocks;
821 write_blocks <<
" blocks = " <<
822 write_blocks * block_size / 1024 / 1024 <<
" MiB");
833 prepare_write_pool(*m_pool, num_threads);
842 assert(m_blocks[i] == NULL);
848 m_unhinted_block = 0;
850 m_write_phase =
false;
861 assert(block_index < m_num_blocks);
862 assert(m_blocks[block_index] == NULL ||
863 m_blocks[block_index] == reinterpret_cast<block_type*>(1));
865 if (m_blocks[block_index] == reinterpret_cast<block_type*>(1))
869 m_blocks[block_index] = m_pool->
steal();
875 "read_block needs to re-read block index=" << block_index);
877 static bool s_warned =
false;
882 "writer requested to re-read block from EM.");
883 STXXL_ERRMSG(
"This should never occur in full-performance mode, "
884 "verify that you run in debug mode.");
889 m_blocks[block_index] = m_pool->
steal();
890 request_ptr req = m_pool->
read(m_blocks[block_index], m_bids[block_index]);
893 assert(m_blocks[block_index]);
902 assert(block_index < m_num_blocks);
903 assert(m_blocks[block_index] != NULL &&
904 m_blocks[block_index] != reinterpret_cast<block_type*>(1));
908 std::min<internal_size_type>(block_items, m_capacity - block_index * (
external_size_type)block_items);
910 STXXL_DEBUG(
"ea[" <<
this <<
"]: write_block index=" << block_index <<
911 " this_block_items=" << this_block_items);
913 assert(this_block_items > 0);
914 block_type& this_block = *m_blocks[block_index];
916 m_minima[block_index] = this_block[0];
919 m_pool->
write(m_blocks[block_index], m_bids[block_index]);
921 m_blocks[block_index] = NULL;
932 assert(m_unhinted_block < m_num_blocks);
935 size_t i = m_unhinted_block++;
937 STXXL_DEBUG(
"ea[" <<
this <<
"]: prefetching block_index=" << i);
940 assert(m_blocks[i] == NULL);
945 m_requests[i] = m_pool->
read(m_blocks[i], m_bids[i]);
952 return (m_unhinted_block < m_num_blocks);
959 assert(m_unhinted_block < m_num_blocks);
960 return m_minima[m_unhinted_block];
966 assert(get_end_block_index() <= m_unhinted_block);
967 return m_unhinted_block - get_end_block_index();
978 m_old_unhinted_block = m_unhinted_block;
979 m_unhinted_block = get_end_block_index();
980 assert(get_end_block_index() <= m_old_unhinted_block);
986 assert(m_unhinted_block < m_num_blocks);
991 " block_index=" << m_unhinted_block);
1000 for (
size_t i = m_unhinted_block; i < m_old_unhinted_block; ++i) {
1001 STXXL_DEBUG(
"ea[" <<
this <<
"]: discarding prefetch hint on"
1003 m_requests[i]->cancel();
1004 m_requests[i]->wait();
1017 for (
size_t i = m_old_unhinted_block; i < m_unhinted_block; ++i)
1019 STXXL_DEBUG(
"ea[" <<
this <<
"]: perform real-hinting of"
1023 assert(m_blocks[i] == NULL);
1025 m_requests[i] = m_pool->
read(m_blocks[i], m_bids[i]);
1040 size_t begin = get_end_block_index(), i = begin;
1043 " block index=" << i <<
1044 " end_index=" << m_end_index);
1046 assert(has_em_data());
1048 assert(i < m_unhinted_block);
1049 assert(m_bids[i].valid());
1050 assert(m_requests[i].valid());
1053 m_requests[i]->wait();
1054 assert(m_requests[i]->poll());
1055 assert(m_blocks[i]);
1057 update_block_pointers(i);
1061 while (i < m_unhinted_block && m_requests[i]->poll())
1064 " block index=" << i <<
1065 " end_index=" << m_end_index);
1066 m_requests[i]->wait();
1067 assert(m_requests[i]->poll());
1068 assert(m_blocks[i]);
1070 update_block_pointers(i);
1083 size_t begin = get_end_block_index(), i = begin;
1084 while (i < m_unhinted_block)
1086 STXXL_DEBUG(
"wait_all_hinted_blocks(): ea[" <<
this <<
"]: waiting for" <<
1087 " block index=" << i <<
1088 " end_index=" << m_end_index);
1089 m_requests[i]->wait();
1090 assert(m_requests[i]->poll());
1091 assert(m_blocks[i]);
1092 update_block_pointers(i);
1102 return get_end_block_index() - (m_index / block_items);
1109 assert(m_index + n <= m_capacity);
1110 assert(m_index + n <= m_end_index);
1111 assert(m_size >= n);
1113 STXXL_DEBUG(
"ea[" <<
this <<
"]: remove " << n <<
" items");
1118 const size_t block_index = m_index / block_items;
1120 const size_t index_after = m_index + n;
1121 size_t block_index_after = index_after / block_items;
1122 size_t local_index_after = index_after % block_items;
1124 if (m_size == n && local_index_after != 0)
1125 ++block_index_after;
1127 assert(block_index_after <= m_num_blocks);
1130 bid_iterator i_end = m_bids.begin() + block_index_after;
1131 assert(i_begin <= i_end);
1132 block_manager::get_instance()->delete_blocks(i_begin, i_end);
1134 for (
size_t i = block_index; i < block_index_after; ++i) {
1135 assert(block_valid(i));
1140 m_index = index_after;
1143 unsigned_type blocks_freed = block_index_after - block_index;
1145 STXXL_DEBUG(
"ea[" <<
this <<
"]: after remove:" <<
1146 " index_after=" << index_after <<
1147 " block_index_after=" << block_index_after <<
1148 " local_index_after=" << local_index_after <<
1149 " blocks_freed=" << blocks_freed <<
1150 " num_blocks=" << m_num_blocks <<
1151 " capacity=" << m_capacity);
1153 assert(block_index_after <= m_num_blocks);
1155 assert(block_index_after <= get_end_block_index());
1157 return blocks_freed;
1166 if (!m_write_phase) {
1167 if (block_index >= m_num_blocks)
return false;
1168 return (m_requests[block_index] && m_requests[block_index]->poll());
1171 return (
bool)m_blocks[block_index];
1180 STXXL_DEBUG(
"ea[" <<
this <<
"]: updating block pointers for " << block_index);
1182 m_block_pointers[block_index].first = m_blocks[block_index]->begin();
1183 if (block_index + 1 != m_num_blocks)
1184 m_block_pointers[block_index].second = m_blocks[block_index]->end();
1186 m_block_pointers[block_index].second =
1187 m_block_pointers[block_index].first
1188 + (m_capacity - block_index * block_items);
1190 assert(m_block_pointers[block_index].first != NULL);
1191 assert(m_block_pointers[block_index].second != NULL);
1196 size_t mod = m_capacity % block_items;
1197 return (mod > 0) ? mod : (size_t)block_items;
1227 template <
class ExternalArrayType>
1242 static const bool debug =
false;
1272 assert(block_index < m_ea.num_blocks());
1273 unsigned int ref = m_ref_count[block_index]++;
1279 STXXL_DEBUG(
"get_block_ref block_index=" << block_index <<
1280 " ref=" << ref <<
" reading.");
1281 m_ea.read_block(block_index);
1284 STXXL_DEBUG(
"get_block_ref block_index=" << block_index <<
1288 return m_ea.m_blocks[block_index];
1297 assert(block_index < m_ea.num_blocks());
1299 assert(m_ref_total > 0);
1302 unsigned int ref = --m_ref_count[block_index];
1305 STXXL_DEBUG(
"free_block_ref block_index=" << block_index <<
1306 " ref=" << ref <<
" written.");
1307 m_ea.write_block(block_index);
1310 STXXL_DEBUG(
"free_block_ref block_index=" << block_index <<
1337 static const size_t block_items = ea_type::block_items;
1340 static const bool debug =
false;
1368 : m_writer(NULL), m_live(false), m_index(0)
1377 STXXL_DEBUG(
"Construct iterator for index " << m_index);
1382 : m_writer(other.m_writer),
1384 m_index(other.m_index)
1386 STXXL_DEBUG(
"Copy-Construct iterator for index " << m_index);
1397 m_writer->free_block_ref(m_block_index);
1409 if (!m_live)
return;
1411 m_writer->free_block_ref(m_block_index);
1413 STXXL_DEBUG(
"Destruction of iterator for index " << m_index <<
1414 " in block " << m_index / block_items);
1430 m_block_index = m_index / block_items;
1431 m_current = m_index % block_items;
1433 STXXL_DEBUG(
"operator*() live request for index=" << m_index <<
1434 " block_index=" << m_block_index <<
1435 " m_current=" << m_current);
1438 m_block = m_writer->get_block_ref(m_block_index);
1448 return (*m_block)[m_current];
1454 return &(operator * ());
1461 if (
UNLIKELY(!m_live))
return *
this;
1465 if (
LIKELY(m_current != block_items))
return *
this;
1468 m_writer->free_block_ref(m_block_index);
1476 return self_type(m_writer, m_index + addend);
1480 return self_type(m_writer, m_index - subtrahend);
1516 m_ref_count(ea.num_blocks(), 0)
1523 if (num_threads == 0)
1524 num_threads = omp_get_max_threads();
1526 if (num_threads == 0)
1530 m_ea.prepare_write(num_threads);
1536 double step = (double)m_ea.capacity() / (double)num_threads;
1537 m_live_boundary.resize(num_threads - 1);
1539 for (
unsigned int i = 0; i < num_threads - 1; ++i)
1543 " in block " << index / ea_type::block_items);
1544 m_live_boundary[i] =
iterator(
this, index);
1545 m_live_boundary[i].make_live();
1551 m_live_boundary.clear();
1555 m_ea.finish_write();
1565 return iterator(
this, m_ea.capacity());
1576 template <
class ParentType>
1589 static const unsigned initial_ia_size = 2;
1590 static const unsigned initial_ea_size = 2;
1614 return m_proc[m_parent.m_heaps.top()]->insertion_heap[0];
1616 return m_ias[m_parent.m_ia.top()].get_min();
1618 return m_parent.m_parent.m_extract_buffer[
1619 m_parent.m_parent.m_extract_buffer_index
1626 bool operator () (
const int a,
const int b)
const
1628 return m_compare(get_value(a), get_value(b));
1639 : m_proc(proc), m_compare(compare)
1644 return m_proc[index]->insertion_heap[0];
1647 bool operator () (
const int a,
const int b)
const
1649 return m_compare(get_value(a), get_value(b));
1660 : m_ias(ias), m_compare(compare)
1663 bool operator () (
const int a,
const int b)
const
1665 return m_compare(m_ias[a].get_min(), m_ias[b].get_min());
1698 m_compare(parent.m_inv_compare),
1700 m_head_comp(*this, parent.m_proc,
1701 parent.m_internal_arrays, m_compare),
1702 m_heaps_comp(parent.m_proc, m_compare),
1703 m_ia_comp(parent.m_internal_arrays, m_compare),
1705 m_head(3, m_head_comp),
1706 m_heaps(m_parent.m_num_insertion_heaps, m_heaps_comp),
1707 m_ia(initial_ia_size, m_ia_comp)
1711 std::pair<unsigned, unsigned>
top()
1713 unsigned type = m_head.top();
1717 return std::make_pair(HEAP, m_heaps.top());
1719 return std::make_pair(IA, m_ia.top());
1721 return std::make_pair(EB, 0);
1723 return std::make_pair(TYPE_ERROR, 0);
1730 m_heaps.notify_change(index);
1731 m_head.notify_change(HEAP);
1737 m_head.notify_change(EB);
1743 m_ia.notify_change(index);
1744 m_head.notify_change(IA);
1750 m_ia.activate_player(index);
1751 m_head.notify_change(IA);
1757 m_heaps.deactivate_player(index);
1758 if (!m_heaps.empty())
1759 m_head.notify_change(HEAP);
1761 m_head.deactivate_player(HEAP);
1767 m_head.deactivate_player(EB);
1773 m_ia.deactivate_player(index);
1775 m_head.notify_change(IA);
1777 m_head.deactivate_player(IA);
1784 m_head.deactivate_player(HEAP);
1790 m_ia.resize_and_clear(initial_ia_size);
1791 m_head.deactivate_player(IA);
1796 if (!m_parent.m_internal_arrays.empty())
1798 m_ia.resize_and_rebuild(m_parent.m_internal_arrays.size());
1799 m_head.notify_change(IA);
1803 m_head.deactivate_player(IA);
1810 return m_ia.num_slots();
1816 std::ostringstream ss;
1817 ss <<
"Head:" << std::endl << m_head.to_string() << std::endl;
1818 ss <<
"Heaps:" << std::endl << m_heaps.to_string() << std::endl;
1819 ss <<
"IA:" << std::endl << m_ia.to_string() << std::endl;
1827 m_head.print_stats();
1829 m_heaps.print_stats();
1861 class CompareType = std::less<ValueType>,
1864 uint64 DefaultMemSize = 1* 1024L* 1024L* 1024L,
1890 static const bool debug =
false;
1913 block_size, DefaultMemSize, MaxItems> >;
1926 return compare(y, x);
1951 static const bool c_merge_sorted_heaps =
true;
1955 static const unsigned c_num_write_buffer_blocks = 14;
1959 static const unsigned c_num_reserved_external_arrays = 10;
1963 static const size_type c_default_single_heap_ram = 1L * 1024L * 1024L;
1986 static const bool c_limit_extract_buffer =
true;
1990 static const unsigned c_single_insert_limit = 100;
2005 return m_insertion_heap_capacity *
sizeof(
value_type);
2124 : m_eas(eas), m_compare(compare) { }
2126 bool operator () (
const size_t& a,
const size_t& b)
const
2128 return m_compare(m_eas[a].get_next_block_min(),
2129 m_eas[b].get_next_block_min());
2131 } m_external_min_comparator;
2146 : m_eas(eas), m_compare(compare) { }
2148 bool operator () (
const size_t& a,
const size_t& b)
const
2150 return m_compare(m_eas[a].get_next_hintable_min(),
2151 m_eas[b].get_next_hintable_min());
2153 } m_hint_comparator;
2174 {
return a.
empty(); }
2180 {
return a.
empty(); }
2188 m_internal_arrays.end(),
2192 ia != m_internal_arrays.end(); ++ia)
2194 m_mem_left += ia->int_memory();
2195 --m_internal_levels[ia->level()];
2198 if (swap_end != m_internal_arrays.end())
2200 " cleaned=" << m_internal_arrays.end() - swap_end);
2202 m_internal_arrays.erase(swap_end, m_internal_arrays.end());
2203 m_minima.rebuild_internal_arrays();
2215 ea_iterator first = m_external_arrays.begin();
2216 ea_iterator last = m_external_arrays.end();
2217 ea_iterator swap_end = first;
2218 size_t size = m_external_arrays.end() - m_external_arrays.begin();
2219 size_t first_removed = size;
2220 while (first != last)
2225 swap(*first, *swap_end);
2228 else if (first_removed >= size)
2230 first_removed = first - m_external_arrays.begin();
2236 for (ea_iterator ea = swap_end; ea != last; ++ea) {
2237 m_mem_left += ea->int_memory();
2238 --m_external_levels[ea->level()];
2241 size_t swap_end_index = swap_end - m_external_arrays.begin();
2245 for (
size_t i = size; i != first_removed; ) {
2247 m_external_min_tree.deactivate_player_step(i);
2249 m_hint_tree.deactivate_player_step(i);
2253 for (
size_t i = first_removed; i < swap_end_index; ++i) {
2254 update_external_min_tree(i);
2256 update_hint_tree(i);
2259 STXXL_DEBUG(
"Removed " << m_external_arrays.end() - swap_end <<
2260 " empty external arrays.");
2262 m_external_arrays.erase(swap_end, m_external_arrays.end());
2273 template <
typename RandomAccessIterator,
typename HeapCompareType>
2275 push_heap(RandomAccessIterator first, RandomAccessIterator last,
2276 HeapCompareType comp)
2278 typedef typename std::iterator_traits<RandomAccessIterator>::value_type
2286 while (index > 0 && comp(*(first + parent), value))
2288 *(first + index) =
STXXL_MOVE(*(first + parent));
2290 parent = (index - 1) / 2;
2328 float num_read_blocks_per_ea = 1.5f,
2329 unsigned_type num_write_buffer_blocks = c_num_write_buffer_blocks,
2331 size_type single_heap_ram = c_default_single_heap_ram,
2333 : c_max_internal_level_size(64),
2334 c_max_external_level_size(64),
2336 m_inv_compare(m_compare),
2339 m_num_insertion_heaps(num_insertion_heaps > 0 ? num_insertion_heaps : omp_get_max_threads()),
2341 m_num_insertion_heaps(num_insertion_heaps > 0 ? num_insertion_heaps : 1),
2343 m_insertion_heap_capacity(single_heap_ram / sizeof(
value_type)),
2344 m_mem_total(total_ram),
2345 m_mem_for_heaps(m_num_insertion_heaps * single_heap_ram),
2346 m_num_read_blocks_per_ea(num_read_blocks_per_ea),
2347 m_num_read_blocks(0),
2348 m_num_hinted_blocks(0),
2349 m_num_used_read_blocks(0),
2351 m_in_bulk_push(false),
2352 m_is_very_large_bulk(false),
2353 m_extract_buffer_index(0),
2356 m_extract_buffer_size(0),
2360 m_proc(m_num_insertion_heaps),
2361 m_pool(0, num_write_buffer_blocks),
2362 m_external_arrays(),
2364 m_external_min_comparator(m_external_arrays, m_inv_compare),
2365 m_external_min_tree(4, m_external_min_comparator),
2366 m_hint_comparator(m_external_arrays, m_inv_compare),
2367 m_hint_tree(4, m_hint_comparator),
2369 m_limit_extract(false)
2372 if (!omp_get_nested()) {
2374 if (!omp_get_nested()) {
2375 STXXL_ERRMSG(
"Could not enable OpenMP's nested parallelism, "
2376 "however, the PPQ requires this OpenMP feature.");
2381 STXXL_ERRMSG(
"You are using stxxl::parallel_priority_queue without "
2382 "support for OpenMP parallelism.");
2383 STXXL_ERRMSG(
"This is probably not what you want, so check the "
2384 "compilation settings.");
2387 if (c_limit_extract_buffer) {
2388 m_extract_buffer_limit = (extract_buffer_ram > 0)
2390 :
static_cast<size_type>(((double)(m_mem_total) * c_default_extract_buffer_ram_part /
sizeof(
value_type)));
2394 m_internal_levels[i] = 0;
2397 m_external_levels[i] = 0;
2402 m_mem_left = m_mem_total - 2 * m_mem_for_heaps;
2406 #pragma omp parallel for
2408 for (
long p = 0; p < m_num_insertion_heaps; ++p)
2412 assert(m_proc[p]->insertion_heap.capacity() *
sizeof(
value_type)
2413 == insertion_heap_int_memory());
2416 m_mem_left -= m_num_insertion_heaps * insertion_heap_int_memory();
2422 external_array_type::prepare_write_pool(m_pool, m_num_insertion_heaps);
2423 m_mem_left -= m_pool.size_write() * block_size;
2426 if (c_merge_sorted_heaps) {
2427 m_internal_arrays.reserve(m_mem_total / m_mem_for_heaps);
2430 m_internal_arrays.reserve(m_mem_total * m_num_insertion_heaps / m_mem_for_heaps);
2434 m_external_arrays.reserve(c_num_reserved_external_arrays);
2436 if (m_mem_total < m_mem_left)
2438 STXXL_ERRMSG(
"Minimum memory requirement insufficient, "
2439 "increase PPQ's memory limit or decrease buffers.");
2451 for (
size_t p = 0; p < m_num_insertion_heaps; ++p)
2468 mem_used += 2 * m_mem_for_heaps
2469 + m_pool.size_write() * block_size
2470 + m_pool.free_size_prefetch() * block_size
2471 + m_num_hinted_blocks * block_size
2472 + m_num_used_read_blocks * block_size;
2476 size_t num_hinted = 0, num_used_read = 0;
2477 for (
size_t i = 0; i < m_external_arrays.size(); ++i) {
2478 num_hinted += m_external_arrays[i].num_hinted_blocks();
2479 num_used_read += m_external_arrays[i].num_used_blocks();
2483 STXXL_CHECK(num_used_read == m_num_used_read_blocks);
2487 - m_pool.free_size_prefetch()
2488 - m_num_hinted_blocks);
2494 for (
int_type p = 0; p < m_num_insertion_heaps; ++p)
2500 m_proc[p]->insertion_heap.end(),
2503 STXXL_CHECK(m_proc[p]->insertion_heap.capacity() <= m_insertion_heap_capacity);
2505 heaps_size += m_proc[p]->insertion_heap.size();
2506 mem_used += m_proc[p]->insertion_heap.capacity() *
sizeof(
value_type);
2509 if (!m_in_bulk_push)
2516 std::vector<unsigned_type> ia_levels(c_max_internal_levels, 0);
2519 m_internal_arrays.begin(); ia != m_internal_arrays.end(); ++ia)
2521 ia_size += ia->size();
2522 ia_memory += ia->int_memory();
2523 ++ia_levels[ia->level()];
2527 mem_used += ia_memory;
2536 std::vector<unsigned_type> ea_levels(c_max_external_levels, 0);
2539 m_external_arrays.begin(); ea != m_external_arrays.end(); ++ea)
2541 ea_size += ea->size();
2542 ea_memory += ea->int_memory();
2543 ++ea_levels[ea->level()];
2547 mem_used += ea_memory;
2566 return m_heaps_size + m_internal_size + m_external_size + m_extract_buffer_size;
2572 return (size() == 0);
2578 assert(m_mem_total >= m_mem_left);
2579 return (m_mem_total - m_mem_left);
2586 return (m_extract_buffer_size == 0);
2601 assert(!m_in_bulk_push);
2602 m_in_bulk_push =
true;
2603 m_bulk_first_delayed_external_array = m_external_arrays.size();
2605 size_type heap_capacity = m_num_insertion_heaps * m_insertion_heap_capacity;
2609 if (bulk_size > heap_capacity && 0) {
2610 m_is_very_large_bulk =
true;
2613 m_is_very_large_bulk =
false;
2615 if (bulk_size + m_heaps_size > heap_capacity) {
2616 if (m_heaps_size > 0) {
2623 for (
int_type p = 0; p < m_num_insertion_heaps; ++p)
2624 m_proc[p]->heap_add_size = 0;
2636 assert(m_in_bulk_push);
2638 heap_type& insheap = m_proc[p]->insertion_heap;
2640 if (!m_is_very_large_bulk && 0)
2644 if (
UNLIKELY(insheap.size() >= m_insertion_heap_capacity)) {
2648 m_heaps_size += m_proc[p]->heap_add_size;
2650 m_proc[p]->heap_add_size = 0;
2651 flush_insertion_heap(p);
2654 assert(insheap.size() < insheap.capacity());
2657 insheap.push_back(element);
2658 std::push_heap(insheap.begin(), insheap.end(), m_compare);
2660 else if (!m_is_very_large_bulk && 1)
2665 if (
UNLIKELY(insheap.size() >= m_insertion_heap_capacity)) {
2669 m_heaps_size += m_proc[p]->heap_add_size;
2671 m_proc[p]->heap_add_size = 0;
2672 flush_insertion_heap(p);
2675 assert(insheap.size() < insheap.capacity());
2678 insheap.push_back(element);
2682 if (
UNLIKELY(insheap.size() >= 2 * 1024 * 1024)) {
2686 m_heaps_size += m_proc[p]->heap_add_size;
2688 m_proc[p]->heap_add_size = 0;
2689 flush_insertion_heap(p);
2692 assert(insheap.size() < insheap.capacity());
2695 insheap.push_back(element);
2698 m_proc[p]->heap_add_size++;
2711 return bulk_push(element, (
unsigned_type)omp_get_thread_num());
2714 return bulk_push(element,
id);
2724 assert(m_in_bulk_push);
2725 m_in_bulk_push =
false;
2727 if (!m_is_very_large_bulk && 0)
2729 for (
int_type p = 0; p < m_num_insertion_heaps; ++p)
2731 m_heaps_size += m_proc[p]->heap_add_size;
2733 if (!m_proc[p]->insertion_heap.empty())
2734 m_minima.update_heap(p);
2737 else if (!m_is_very_large_bulk && 1)
2740 #pragma omp parallel for
2742 for (
int_type p = 0; p < m_num_insertion_heaps; ++p)
2745 for (
unsigned_type index = m_proc[p]->heap_add_size; index != 0; ) {
2746 std::push_heap(m_proc[p]->insertion_heap.begin(),
2747 m_proc[p]->insertion_heap.end() - (--index),
2754 m_heaps_size += m_proc[p]->heap_add_size;
2757 for (
int_type p = 0; p < m_num_insertion_heaps; ++p)
2759 if (!m_proc[p]->insertion_heap.empty())
2760 m_minima.update_heap(p);
2766 #pragma omp parallel for
2768 for (
int_type p = 0; p < m_num_insertion_heaps; ++p)
2770 if (m_proc[p]->insertion_heap.size() >= m_insertion_heap_capacity) {
2775 m_heaps_size += m_proc[p]->heap_add_size;
2777 m_proc[p]->heap_add_size = 0;
2778 flush_insertion_heap(p);
2782 for (
unsigned_type index = m_proc[p]->heap_add_size; index != 0; ) {
2783 std::push_heap(m_proc[p]->insertion_heap.begin(),
2784 m_proc[p]->insertion_heap.end() - (--index),
2791 m_heaps_size += m_proc[p]->heap_add_size;
2792 m_proc[p]->heap_add_size = 0;
2796 for (
int_type p = 0; p < m_num_insertion_heaps; ++p)
2798 if (!m_proc[p]->insertion_heap.empty())
2799 m_minima.update_heap(p);
2803 if (m_bulk_first_delayed_external_array != m_external_arrays.size()) {
2804 STXXL_DEBUG(
"bulk_push_end: run delayed re-hinting of EAs");
2805 rebuild_hint_tree();
2812 void bulk_pop(std::vector<value_type>& out,
size_t max_size)
2814 STXXL_DEBUG(
"bulk_pop_size with max_size=" << max_size);
2816 const size_t n_elements = std::min<size_t>(max_size, size());
2817 assert(n_elements < m_extract_buffer_limit);
2819 if (m_heaps_size > 0)
2820 flush_insertion_heaps();
2822 convert_eb_into_ia();
2824 refill_extract_buffer(n_elements, n_elements);
2828 swap(m_extract_buffer, out);
2829 m_extract_buffer_index = 0;
2830 m_extract_buffer_size = 0;
2831 m_minima.deactivate_extract_buffer();
2845 STXXL_DEBUG(
"bulk_pop_limit with limit=" << limit);
2847 convert_eb_into_ia();
2849 if (m_heaps_size > 0) {
2851 flush_insertion_heaps();
2853 flush_insertion_heaps_with_limit(limit);
2856 size_type ias = m_internal_arrays.size();
2857 size_type eas = m_external_arrays.size();
2858 std::vector<size_type> sizes(eas + ias);
2859 std::vector<iterator_pair_type> sequences(eas + ias);
2862 int limiting_ea_index = m_external_min_tree.top();
2866 bool has_full_range =
true;
2869 while (limiting_ea_index > -1)
2872 m_external_arrays[limiting_ea_index].get_next_block_min();
2874 if (m_compare(ea_limit, this_limit)) {
2879 if (m_external_arrays[limiting_ea_index].num_hinted_blocks() == 0) {
2881 this_limit = ea_limit;
2882 has_full_range =
false;
2886 wait_next_ea_blocks(limiting_ea_index);
2888 limiting_ea_index = m_external_min_tree.top();
2893 for (
size_type i = 0; i < eas + ias; ++i) {
2897 assert(!m_external_arrays[i].empty());
2898 assert(m_external_arrays[i].valid());
2899 begin = m_external_arrays[i].begin();
2900 end = m_external_arrays[i].end();
2904 assert(!(m_internal_arrays[j].empty()));
2905 begin = m_internal_arrays[j].begin();
2906 end = m_internal_arrays[j].end();
2909 end = std::lower_bound(begin, end, this_limit, m_inv_compare);
2911 sizes[i] = std::distance(begin, end);
2912 sequences[i] = std::make_pair(begin, end);
2915 output_size = std::accumulate(sizes.begin(), sizes.end(), 0);
2916 if (output_size > max_size) {
2917 output_size = max_size;
2918 has_full_range =
false;
2920 out.resize(output_size);
2923 " sequences=" << sequences.size() <<
2924 " output_size=" << output_size <<
2925 " has_full_range=" << has_full_range);
2928 sequences.begin(), sequences.end(),
2929 out.begin(), output_size, m_inv_compare);
2931 advance_arrays(sequences, sizes, eas, ias);
2935 return has_full_range;
2938 #if TODO_MAYBE_FIXUP_LATER
2944 void bulk_push_vector(std::vector<value_type>& elements)
2946 size_type heap_capacity = m_num_insertion_heaps * m_insertion_heap_capacity;
2947 if (elements.size() > heap_capacity / 2) {
2948 flush_array(elements);
2952 bulk_push_begin(elements.size());
2954 #pragma omp parallel
2956 const unsigned thread_num = omp_get_thread_num();
2957 #pragma omp parallel for
2958 for (size_type i = 0; i < elements.size(); ++i) {
2959 bulk_push(elements[i], thread_num);
2963 const unsigned thread_num = m_rng() % m_num_insertion_heaps;
2964 for (size_type i = 0; i < elements.size(); ++i) {
2965 bulk_push(elements[i], thread_num);
2986 m_aggregated_pushes.push_back(element);
2989 #if TODO_MAYBE_FIXUP_LATER
2994 void flush_aggregated_pushes()
2996 size_type size = m_aggregated_pushes.size();
2997 size_type ram_internal = 2 * size *
sizeof(value_type);
2998 size_type heap_capacity = m_num_insertion_heaps * m_insertion_heap_capacity;
3000 if (ram_internal > m_mem_for_heaps / 2) {
3001 flush_array(m_aggregated_pushes);
3003 else if ((m_aggregated_pushes.size() > c_single_insert_limit) && (m_aggregated_pushes.size() < heap_capacity)) {
3004 bulk_push_vector(m_aggregated_pushes);
3007 for (value_iterator i = m_aggregated_pushes.begin(); i != m_aggregated_pushes.end(); ++i) {
3012 m_aggregated_pushes.clear();
3027 assert(!m_in_bulk_push && !m_limit_extract);
3029 heap_type& insheap = m_proc[p]->insertion_heap;
3031 if (insheap.size() >= m_insertion_heap_capacity) {
3032 flush_insertion_heap(p);
3036 insheap.push_back(element);
3037 unsigned_type index = push_heap(insheap.begin(), insheap.end(),
3041 if (insheap.size() == 1 || index == 0)
3042 m_minima.update_heap(p);
3048 assert(!m_in_bulk_push && !m_limit_extract);
3051 if (extract_buffer_empty()) {
3052 refill_extract_buffer(
std::min(m_extract_buffer_limit,
3053 m_internal_size + m_external_size));
3056 static const bool debug =
false;
3058 std::pair<unsigned, unsigned> type_and_index = m_minima.top();
3059 const unsigned& type = type_and_index.first;
3060 const unsigned& index = type_and_index.second;
3065 case minima_type::HEAP:
3067 ": " << m_proc[index]->insertion_heap[0]);
3068 return m_proc[index]->insertion_heap[0];
3069 case minima_type::IA:
3071 ": " << m_internal_arrays[index].get_min());
3072 return m_internal_arrays[index].get_min();
3073 case minima_type::EB:
3075 ": " << m_extract_buffer[m_extract_buffer_index]);
3076 return m_extract_buffer[m_extract_buffer_index];
3086 assert(!m_in_bulk_push && !m_limit_extract);
3088 m_stats.num_extracts++;
3090 if (extract_buffer_empty()) {
3091 refill_extract_buffer(
std::min(m_extract_buffer_limit,
3092 m_internal_size + m_external_size));
3095 m_stats.extract_min_time.start();
3097 std::pair<unsigned, unsigned> type_and_index = m_minima.top();
3098 unsigned type = type_and_index.first;
3099 unsigned index = type_and_index.second;
3104 case minima_type::HEAP:
3106 heap_type& insheap = m_proc[index]->insertion_heap;
3108 m_stats.pop_heap_time.start();
3109 std::pop_heap(insheap.begin(), insheap.end(), m_compare);
3111 m_stats.pop_heap_time.stop();
3115 if (!insheap.empty())
3116 m_minima.update_heap(index);
3118 m_minima.deactivate_heap(index);
3122 case minima_type::IA:
3124 m_internal_arrays[index].inc_min();
3127 if (!(m_internal_arrays[index].empty()))
3128 m_minima.update_internal_array(index);
3131 m_minima.deactivate_internal_array(index);
3135 case minima_type::EB:
3137 ++m_extract_buffer_index;
3138 assert(m_extract_buffer_size > 0);
3139 --m_extract_buffer_size;
3141 if (!extract_buffer_empty())
3142 m_minima.update_extract_buffer();
3144 m_minima.deactivate_extract_buffer();
3153 m_stats.extract_min_time.stop();
3177 m_limit_extract =
true;
3178 m_limit_element = limit;
3180 std::vector<value_type> new_extract_buffer;
3181 m_limit_has_full_range =
3182 bulk_pop_limit(new_extract_buffer, limit, m_extract_buffer_limit);
3183 std::swap(new_extract_buffer, m_extract_buffer);
3185 m_extract_buffer_index = 0;
3186 m_extract_buffer_size = m_extract_buffer.size();
3187 if (m_extract_buffer_size)
3188 m_minima.update_extract_buffer();
3190 m_minima.deactivate_extract_buffer();
3192 bulk_push_begin(bulk_size);
3198 assert(m_limit_extract);
3199 assert(!m_compare(m_limit_element, element));
3201 return bulk_push(element, p);
3207 assert(m_limit_extract);
3211 if (m_extract_buffer_size == 0 && m_limit_has_full_range)
3212 return m_limit_element;
3214 if (extract_buffer_empty())
3217 std::vector<value_type> new_extract_buffer;
3218 m_limit_has_full_range =
3219 bulk_pop_limit(new_extract_buffer, m_limit_element,
3220 m_extract_buffer_limit);
3222 std::swap(new_extract_buffer, m_extract_buffer);
3224 m_extract_buffer_index = 0;
3225 m_extract_buffer_size = m_extract_buffer.size();
3226 if (m_extract_buffer_size)
3227 m_minima.update_extract_buffer();
3229 m_minima.deactivate_extract_buffer();
3232 return m_extract_buffer[m_extract_buffer_index];
3238 assert(m_limit_extract);
3240 ++m_extract_buffer_index;
3241 assert(m_extract_buffer_size > 0);
3242 --m_extract_buffer_size;
3244 if (extract_buffer_empty() && !m_limit_has_full_range)
3247 std::vector<value_type> new_extract_buffer;
3248 m_limit_has_full_range =
3249 bulk_pop_limit(new_extract_buffer, m_limit_element,
3250 m_extract_buffer_limit);
3252 std::swap(new_extract_buffer, m_extract_buffer);
3254 m_extract_buffer_index = 0;
3255 m_extract_buffer_size = m_extract_buffer.size();
3256 if (m_extract_buffer_size)
3257 m_minima.update_extract_buffer();
3259 m_minima.deactivate_extract_buffer();
3266 assert(m_limit_extract);
3270 m_limit_extract =
false;
3282 std::vector<unsigned_type> back_size(m_num_insertion_heaps);
3287 for (
size_t p = 0; p < m_num_insertion_heaps; ++p)
3289 heap_type& insheap = m_proc[p]->insertion_heap;
3291 typename heap_type::iterator back = insheap.end();
3293 while (back != insheap.begin() &&
3294 m_compare(limit, insheap[0]))
3298 std::pop_heap(insheap.begin(), back, m_compare);
3304 for (
typename heap_type::const_iterator it = insheap.begin();
3305 it != insheap.end(); ++it)
3308 assert(!m_compare(limit, *it));
3310 assert(m_compare(limit, *it));
3313 back_size[p] = insheap.end() - back;
3320 STXXL_DEBUG(
"flush_insertion_heaps_with_limit(): back_sum = " << back_sum);
3325 flush_ia_ea_until_memory_free(back_sum *
sizeof(
value_type));
3327 std::vector<value_type> values(back_sum);
3330 typename std::vector<value_type>::iterator vi = values.begin();
3331 for (
size_t p = 0; p < m_num_insertion_heaps; ++p)
3333 heap_type& insheap = m_proc[p]->insertion_heap;
3335 std::copy(insheap.end() - back_size[p], insheap.end(), vi);
3337 insheap.resize(insheap.size() - back_size[p]);
3339 if (insheap.empty())
3340 m_minima.deactivate_heap(p);
3342 m_minima.update_heap(p);
3347 add_as_internal_array(values);
3348 m_heaps_size -= back_sum;
3359 STXXL_ERRMSG(
"Merging external arrays. This should not happen."
3360 <<
" You should adjust memory assignment and/or external array level size.");
3361 check_external_level(0,
true);
3367 if (!m_in_bulk_push)
3368 rebuild_hint_tree();
3370 assert(m_external_arrays.size() - 1 >= m_bulk_first_delayed_external_array);
3379 if (m_mem_left >= mem_free)
return;
3381 if (m_internal_size > 0) {
3382 flush_internal_arrays();
3385 merge_external_arrays();
3388 assert(m_mem_left >= mem_free);
3396 m_num_read_blocks_per_ea * m_external_arrays.size();
3399 " m_num_read_blocks=" << m_num_read_blocks <<
3400 " ea_size=" << m_external_arrays.size() <<
3401 " m_num_read_blocks_per_ea=" << m_num_read_blocks_per_ea <<
3402 " new_num_read_blocks=" << new_num_read_blocks <<
3403 " free_size_prefetch=" << m_pool.free_size_prefetch() <<
3404 " m_num_hinted_blocks=" << m_num_hinted_blocks <<
3405 " m_num_used_read_blocks=" << m_num_used_read_blocks);
3408 if (new_num_read_blocks > m_num_read_blocks)
3411 (new_num_read_blocks - m_num_read_blocks) * block_size;
3417 while (new_num_read_blocks > m_num_read_blocks) {
3419 m_pool.add_prefetch(new_block);
3420 ++m_num_read_blocks;
3423 m_mem_left -= mem_needed;
3427 if (new_num_read_blocks < m_num_read_blocks)
3429 while (new_num_read_blocks < m_num_read_blocks &&
3430 m_pool.free_size_prefetch() > 0)
3432 block_type* del_block = m_pool.steal_prefetch();
3434 --m_num_read_blocks;
3435 m_mem_left += block_size;
3438 if (new_num_read_blocks < m_num_read_blocks)
3439 STXXL_ERRMSG(
"WARNING: could not immediately reduce read/prefetch pool!");
3447 m_stats.hint_time.start();
3450 for (
size_t i = 0; i < m_external_arrays.size(); ++i)
3451 m_external_arrays[i].rebuild_hints_prepare();
3454 for (
size_t i = 0; i < m_external_arrays.size(); ++i)
3456 if (m_external_arrays[i].has_unhinted_em_data()) {
3457 m_hint_tree.activate_without_replay(i);
3460 m_hint_tree.deactivate_without_replay(i);
3463 m_hint_tree.rebuild();
3467 m_pool.free_size_prefetch() + m_num_hinted_blocks;
3468 m_num_hinted_blocks = 0;
3471 while (free_prefetch_blocks > 0 &&
3472 (gmin_index = m_hint_tree.top()) >= 0)
3474 assert((
size_t)gmin_index < m_external_arrays.size());
3476 STXXL_DEBUG(
"Give pre-hint in EA[" << gmin_index <<
"] min " <<
3477 m_external_arrays[gmin_index].get_next_hintable_min());
3479 m_external_arrays[gmin_index].rebuild_hints_prehint_next_block();
3480 --free_prefetch_blocks;
3481 ++m_num_hinted_blocks;
3483 if (m_external_arrays[gmin_index].has_unhinted_em_data()) {
3484 m_hint_tree.replay_on_change(gmin_index);
3487 m_hint_tree.deactivate_player(gmin_index);
3492 for (
size_t i = 0; i < m_external_arrays.size(); ++i)
3493 m_external_arrays[i].rebuild_hints_cancel();
3496 for (
size_t i = 0; i < m_external_arrays.size(); ++i)
3497 m_external_arrays[i].rebuild_hints_finish();
3499 assert(free_prefetch_blocks == m_pool.free_size_prefetch());
3501 m_stats.hint_time.stop();
3509 m_stats.hint_time.start();
3510 if (m_external_arrays[ea_index].has_unhinted_em_data()) {
3511 m_hint_tree.replay_on_change(ea_index);
3514 m_hint_tree.deactivate_player(ea_index);
3516 m_stats.hint_time.stop();
3524 if (m_external_arrays[ea_index].has_em_data()) {
3525 m_external_min_tree.replay_on_change(ea_index);
3528 m_external_min_tree.deactivate_player(ea_index);
3536 m_stats.hint_time.start();
3539 " for free_size_prefetch=" << m_pool.free_size_prefetch());
3542 while (m_pool.free_size_prefetch() > 0 &&
3543 (gmin_index = m_hint_tree.top()) >= 0)
3545 assert((
size_t)gmin_index < m_external_arrays.size());
3547 STXXL_DEBUG(
"Give hint in EA[" << gmin_index <<
"]");
3548 m_external_arrays[gmin_index].hint_next_block();
3549 ++m_num_hinted_blocks;
3551 if (m_external_arrays[gmin_index].has_unhinted_em_data()) {
3552 m_hint_tree.replay_on_change(gmin_index);
3555 m_hint_tree.deactivate_player(gmin_index);
3559 m_stats.hint_time.stop();
3569 if (c_limit_extract_buffer) {
3587 m_minima.print_stats();
3599 std::vector<iterator_pair_type>& sequences,
3600 bool reuse_previous_lower_bounds =
false)
3604 static const bool debug =
false;
3606 const size_type eas = m_external_arrays.size();
3607 const size_type ias = m_internal_arrays.size();
3609 assert(sizes.size() == eas + ias);
3610 assert(sequences.size() == eas + ias);
3616 int gmin_index = m_external_min_tree.top();
3617 bool needs_limit = (gmin_index >= 0) ?
true :
false;
3620 #ifdef STXXL_DEBUG_ASSERTIONS
3622 bool test_needs_limit =
false;
3623 int test_gmin_index = 0;
3626 m_stats.refill_minmax_time.start();
3628 if (m_external_arrays[i].has_em_data()) {
3630 m_external_arrays[i].get_next_block_min();
3632 if (!test_needs_limit) {
3633 test_needs_limit =
true;
3634 test_gmin_value = min_value;
3635 test_gmin_index = i;
3639 " test: " << test_gmin_value <<
3640 ": " << m_inv_compare(min_value, test_gmin_value));
3641 if (m_inv_compare(min_value, test_gmin_value)) {
3642 test_gmin_value = min_value;
3643 test_gmin_index = i;
3648 m_stats.refill_minmax_time.stop();
3651 STXXL_ASSERT(!needs_limit || gmin_index == test_gmin_index);
3662 for (
size_type i = 0; i < eas + ias; ++i) {
3666 begin = m_external_arrays[i].begin();
3667 end = m_external_arrays[i].end();
3671 begin = m_internal_arrays[j].begin();
3672 end = m_internal_arrays[j].end();
3677 m_external_arrays[gmin_index].get_next_block_min();
3681 if (reuse_previous_lower_bounds) {
3684 end = std::lower_bound(sequences[i].second, end,
3685 gmin_value, m_inv_compare);
3689 end = std::lower_bound(begin, end,
3690 gmin_value, m_inv_compare);
3695 sizes[i] = std::distance(begin, end);
3696 sequences[i] = std::make_pair(begin, end);
3698 STXXL_DEBUG(
"sequence[" << i <<
"] " << (i < eas ?
"ea " :
"ia ") <<
3699 begin <<
" - " << end <<
3700 " size " << sizes[i] <<
3701 (needs_limit ?
" with ub limit" :
""));
3705 STXXL_DEBUG(
"return with needs_limit: gmin_index=" << gmin_index);
3709 STXXL_DEBUG(
"return with needs_limit: eas=" << eas);
3718 if (m_extract_buffer_size == 0)
return;
3725 m_limit_has_full_range =
false;
3730 flush_ia_ea_until_memory_free(
3731 internal_array_type::int_memory(m_extract_buffer.size())
3734 if (m_extract_buffer_size == 0)
return;
3737 m_minima.deactivate_extract_buffer();
3740 add_as_internal_array(m_extract_buffer, m_extract_buffer_index);
3742 m_extract_buffer_index = 0;
3743 m_extract_buffer_size = 0;
3752 size_t maximum_size = 0)
3755 " ia_size=" << m_internal_arrays.size() <<
3756 " ea_size=" << m_external_arrays.size());
3758 if (maximum_size == 0)
3759 maximum_size = m_extract_buffer_limit;
3763 assert(extract_buffer_empty());
3764 m_extract_buffer_index = 0;
3766 cleanup_external_arrays();
3768 size_type ias, eas = m_external_arrays.size();
3770 m_minima.clear_internal_arrays();
3771 cleanup_internal_arrays();
3772 ias = m_internal_arrays.size();
3774 if (eas == 0 && ias == 0) {
3775 m_extract_buffer.resize(0);
3776 m_minima.deactivate_extract_buffer();
3780 m_stats.num_extract_buffer_refills++;
3781 m_stats.refill_extract_buffer_time.start();
3782 m_stats.refill_time_before_merge.start();
3784 std::vector<size_type> sizes(eas + ias);
3785 std::vector<iterator_pair_type> sequences(eas + ias);
3788 if (minimum_size > 0) {
3789 size_t limiting_ea_index = eas + 1;
3790 bool reuse_lower_bounds =
false;
3791 while (output_size < minimum_size)
3794 " output_size=" << output_size <<
3795 " minimum_size=" << minimum_size <<
3796 " limiting_ea_index=" << limiting_ea_index);
3798 if (limiting_ea_index < eas) {
3799 if (m_external_arrays[limiting_ea_index].num_hinted_blocks() == 0)
3802 wait_next_ea_blocks(limiting_ea_index);
3803 reuse_lower_bounds =
true;
3805 else if (limiting_ea_index == eas) {
3807 STXXL_MSG(
"Warning: refill_extract_buffer(n): "
3808 "minimum_size > # mergeable elements!");
3812 limiting_ea_index = calculate_merge_sequences(
3813 sizes, sequences, reuse_lower_bounds);
3815 output_size = std::accumulate(sizes.begin(), sizes.end(), 0);
3819 calculate_merge_sequences(sizes, sequences);
3820 output_size = std::accumulate(sizes.begin(), sizes.end(), 0);
3823 if (c_limit_extract_buffer) {
3824 output_size = std::min<size_t>(output_size, maximum_size);
3827 m_stats.max_extract_buffer_size.set_max(output_size);
3828 m_stats.total_extract_buffer_size += output_size;
3830 assert(output_size > 0);
3831 m_extract_buffer.resize(output_size);
3832 m_extract_buffer_size = output_size;
3834 m_stats.refill_time_before_merge.stop();
3835 m_stats.refill_merge_time.start();
3838 sequences.begin(), sequences.end(),
3839 m_extract_buffer.begin(), output_size, m_inv_compare);
3841 m_stats.refill_merge_time.stop();
3842 m_stats.refill_time_after_merge.start();
3844 advance_arrays(sequences, sizes, eas, ias);
3846 m_minima.update_extract_buffer();
3848 m_stats.refill_time_after_merge.stop();
3849 m_stats.refill_extract_buffer_time.stop();
3859 m_external_arrays[ea_index].wait_next_blocks();
3861 m_num_hinted_blocks -= used_blocks;
3862 m_num_used_read_blocks += used_blocks;
3864 update_external_min_tree(ea_index);
3869 std::vector<size_type>& sizes,
3870 size_t eas,
size_t ias)
3874 for (
size_type i = 0; i < eas + ias; ++i) {
3876 size_type dist = std::distance(sequences[i].first,
3877 sequences[i].second);
3878 const size_t diff = sizes[i] - dist;
3879 if (diff == 0)
continue;
3884 m_external_arrays[i].remove_items(diff);
3886 m_num_used_read_blocks -= freed_blocks;
3887 total_freed_blocks += freed_blocks;
3890 assert(m_external_size >= diff);
3891 m_external_size -= diff;
3895 m_internal_arrays[j].inc_min(diff);
3896 assert(m_internal_size >= diff);
3897 m_internal_size -= diff;
3903 cleanup_external_arrays();
3906 if (total_freed_blocks)
3907 hint_external_arrays();
3909 m_stats.num_new_external_arrays = 0;
3910 cleanup_internal_arrays();
3916 assert(m_proc[p]->insertion_heap.size() != 0);
3918 heap_type& insheap = m_proc[p]->insertion_heap;
3919 size_t size = insheap.size();
3922 "Flushing insertion heap array p=" << p <<
3923 " size=" << insheap.size() <<
3924 " capacity=" << insheap.capacity() <<
3925 " int_memory=" << internal_array_type::int_memory(insheap.size()) <<
3926 " mem_left=" << m_mem_left);
3928 m_stats.num_insertion_heap_flushes++;
3932 std::sort(insheap.begin(), insheap.end(), m_inv_compare);
3935 #pragma omp critical (stxxl_flush_insertion_heap)
3940 flush_ia_ea_until_memory_free(
3941 internal_array_type::int_memory(insheap.size()));
3944 m_minima.deactivate_heap(p);
3947 add_as_internal_array(insheap);
3950 insheap.reserve(m_insertion_heap_capacity);
3951 assert(insheap.capacity() *
sizeof(
value_type)
3952 == insertion_heap_int_memory());
3958 m_heaps_size -= size;
3961 m_stats.insertion_heap_flush_time += flush_time;
3969 if (c_merge_sorted_heaps) {
3970 max_mem_needed = m_mem_for_heaps;
3973 max_mem_needed = insertion_heap_int_memory();
3978 flush_ia_ea_until_memory_free(max_mem_needed);
3980 m_stats.num_insertion_heap_flushes++;
3981 m_stats.insertion_heap_flush_time.start();
3986 std::vector<std::pair<value_iterator, value_iterator> > sequences(m_num_insertion_heaps);
3989 #pragma omp parallel for
3991 for (
long i = 0; i < m_num_insertion_heaps; ++i)
3993 heap_type& insheap = m_proc[i]->insertion_heap;
3995 std::sort(insheap.begin(), insheap.end(), m_inv_compare);
3997 if (c_merge_sorted_heaps)
3998 sequences[i] = std::make_pair(insheap.begin(), insheap.end());
4000 int_memory += insheap.capacity();
4003 if (c_merge_sorted_heaps)
4005 m_stats.merge_sorted_heaps_time.start();
4006 std::vector<value_type> merged_array(size);
4009 sequences.begin(), sequences.end(),
4010 merged_array.begin(), size, m_inv_compare);
4012 m_stats.merge_sorted_heaps_time.stop();
4014 add_as_internal_array(merged_array);
4016 for (
int_type i = 0; i < m_num_insertion_heaps; ++i)
4018 m_proc[i]->insertion_heap.clear();
4019 m_proc[i]->insertion_heap.reserve(m_insertion_heap_capacity);
4021 m_minima.clear_heaps();
4025 for (
unsigned i = 0; i < m_num_insertion_heaps; ++i)
4027 heap_type& insheap = m_proc[i]->insertion_heap;
4029 if (insheap.size() == 0)
continue;
4031 add_as_internal_array(insheap);
4034 insheap.reserve(m_insertion_heap_capacity);
4037 m_minima.clear_heaps();
4042 m_stats.insertion_heap_flush_time.stop();
4051 " num_arrays=" << m_internal_arrays.size());
4053 m_stats.num_internal_array_flushes++;
4054 m_stats.internal_array_flush_time.start();
4056 m_minima.clear_internal_arrays();
4059 convert_eb_into_ia(
true);
4062 cleanup_internal_arrays();
4064 size_type num_arrays = m_internal_arrays.size();
4067 std::vector<iterator_pair_type> sequences(num_arrays);
4069 for (
unsigned i = 0; i < num_arrays; ++i)
4071 sequences[i] = std::make_pair(m_internal_arrays[i].begin(),
4072 m_internal_arrays[i].end());
4074 int_memory += m_internal_arrays[i].int_memory();
4079 if (int_memory < external_array_type::int_memory(size)
4080 + ceil(m_num_read_blocks_per_ea) * block_size)
4082 return merge_external_arrays();
4089 m_stats.max_merge_buffer_size.set_max(size);
4095 sequences.begin(), sequences.end(),
4096 external_array_writer.
begin(), size, m_inv_compare);
4101 m_external_arrays.swap_back(ea);
4103 m_internal_size = 0;
4104 m_external_size += size;
4108 m_external_min_tree.activate_without_replay(m_external_arrays.size() - 1);
4109 update_external_min_tree(m_external_arrays.size() - 1);
4112 m_hint_tree.activate_without_replay(m_external_arrays.size() - 1);
4113 if (!m_in_bulk_push)
4114 update_hint_tree(m_external_arrays.size() - 1);
4117 m_internal_arrays.clear();
4118 m_stats.num_new_internal_arrays = 0;
4119 cleanup_internal_arrays();
4122 for (
size_t i = 0; i < c_max_internal_levels; ++i)
4123 m_internal_levels[i] = 0;
4125 m_mem_left += int_memory;
4126 m_mem_left -= m_external_arrays.back().int_memory();
4128 m_stats.max_num_external_arrays.set_max(m_external_arrays.size());
4129 m_stats.internal_array_flush_time.stop();
4132 ++m_external_levels[0];
4133 check_external_level(0);
4137 if (!m_in_bulk_push)
4138 rebuild_hint_tree();
4140 assert(m_external_arrays.size() - 1 >= m_bulk_first_delayed_external_array);
4153 const std::vector<unsigned_type>& indices)
4154 : m_eas(eas), m_indices(indices), m_compare(compare) { }
4156 bool operator () (
const size_t& a,
const size_t& b)
const
4158 return m_compare(m_eas[m_indices[a]].get_next_hintable_min(),
4159 m_eas[m_indices[b]].get_next_hintable_min());
4167 if (!force_merge_all)
4171 if (m_external_levels[level] < c_max_external_level_size && !force_merge_all)
4176 std::vector<unsigned_type> ea_index;
4178 for (
unsigned_type i = 0; i < m_external_arrays.size(); ++i)
4180 if (m_external_arrays[i].level() != level && !force_merge_all)
continue;
4181 if (m_external_arrays[i].empty())
continue;
4183 level_size += m_external_arrays[i].size();
4184 int_memory += m_external_arrays[i].int_memory();
4185 ea_index.push_back(i);
4191 if (m_mem_left < external_array_type::int_memory(level_size) && !force_merge_all)
4193 m_mem_left -= external_array_type::int_memory(level_size);
4195 STXXL_ASSERT(force_merge_all || c_max_external_level_size == ea_index.size());
4199 " level=" << level <<
4200 " level_size=" << level_size <<
4201 " sequences=" << num_arrays_to_merge <<
4202 " force_merge_all=" << force_merge_all);
4206 unsigned_type new_level = force_merge_all ? c_max_external_levels - 1 : level + 1;
4213 = external_array_writer.
begin();
4218 m_inv_compare, ea_index);
4221 min_tree_comparator);
4227 while (num_arrays_to_merge != num_arrays_done)
4229 STXXL_DEBUG(
"num_arrays_done = " << num_arrays_done);
4233 for (
int_type i = 0; i < num_arrays_to_merge; ++i) {
4234 if (m_external_arrays[ea_index[i]].has_unhinted_em_data()) {
4245 while (m_mem_left >= block_size) {
4247 m_pool.add_prefetch(new_block);
4248 ++m_num_read_blocks;
4249 m_mem_left -= block_size;
4254 for (
unsigned_type i = 0; i < m_external_arrays.size(); ++i) {
4255 m_external_arrays[i].rebuild_hints_prepare();
4260 m_pool.free_size_prefetch() + m_num_hinted_blocks;
4261 m_num_hinted_blocks = 0;
4263 int gmin_index_index;
4264 while (free_prefetch_blocks > 0 &&
4265 (gmin_index_index = min_tree.
top()) >= 0)
4267 const unsigned_type gmin_index = ea_index[gmin_index_index];
4268 assert(gmin_index < m_external_arrays.size());
4270 STXXL_DEBUG0(
"check_external_level():Give pre-hint in EA[" << gmin_index <<
"] min " <<
4271 m_external_arrays[gmin_index].get_next_hintable_min());
4273 m_external_arrays[gmin_index].rebuild_hints_prehint_next_block();
4274 --free_prefetch_blocks;
4275 ++m_num_hinted_blocks;
4277 if (m_external_arrays[gmin_index].has_unhinted_em_data()) {
4287 for (
size_t i = 0; i < m_external_arrays.size(); ++i)
4288 m_external_arrays[i].rebuild_hints_cancel();
4292 for (
size_t i = 0; i < m_external_arrays.size(); ++i)
4293 m_external_arrays[i].rebuild_hints_finish();
4295 assert(free_prefetch_blocks == m_pool.free_size_prefetch());
4300 for (
size_type i = 0; i < num_arrays_to_merge; ++i) {
4304 m_external_arrays[index].wait_all_hinted_blocks();
4306 m_num_hinted_blocks -= used_blocks;
4307 m_num_used_read_blocks += used_blocks;
4312 std::vector<iterator_pair_type> sequences(num_arrays_to_merge);
4313 std::vector<size_type> sizes(num_arrays_to_merge);
4315 gmin_index_index = min_tree.
top();
4316 bool needs_limit = (gmin_index_index >= 0) ?
true :
false;
4318 for (
size_type i = 0; i < num_arrays_to_merge; ++i) {
4320 iterator begin = m_external_arrays[index].begin();
4321 iterator end = m_external_arrays[index].end();
4324 const unsigned_type gmin_index = ea_index[gmin_index_index];
4326 m_external_arrays[gmin_index].get_next_block_min();
4328 end = std::lower_bound(begin, end,
4329 gmin_value, m_inv_compare);
4332 sizes[i] = std::distance(begin, end);
4333 sequences[i] = std::make_pair(begin, end);
4336 begin <<
" - " << end <<
4337 " size " << sizes[i] <<
4338 (needs_limit ?
" with ub limit" :
""));
4344 size_type output_size = std::accumulate(sizes.begin(), sizes.end(), 0);
4347 sequences.begin(), sequences.end(),
4348 out_iter, output_size, m_inv_compare);
4353 if (!m_external_arrays[index].empty()) {
4356 m_external_arrays[index].remove_items(sizes[i]);
4358 m_num_used_read_blocks -= freed_blocks;
4360 if (m_external_arrays[index].empty())
4373 m_bulk_first_delayed_external_array = 0;
4377 cleanup_external_arrays();
4379 m_external_arrays.swap_back(ea);
4380 ++m_external_levels[new_level];
4383 m_external_min_tree.activate_without_replay(m_external_arrays.size() - 1);
4384 update_external_min_tree(m_external_arrays.size() - 1);
4387 m_hint_tree.activate_without_replay(m_external_arrays.size() - 1);
4388 if (!m_in_bulk_push)
4389 update_hint_tree(m_external_arrays.size() - 1);
4394 if (!force_merge_all)
4395 check_external_level(level + 1);
4407 const size_t size = values.size();
4408 const size_t capacity = values.capacity();
4409 assert(size > used);
4413 internal_array_type::int_memory(capacity));
4414 m_internal_arrays.swap_back(new_array);
4416 if (!extract_buffer_empty()) {
4417 m_stats.num_new_internal_arrays++;
4418 m_stats.max_num_new_internal_arrays.set_max(
4419 m_stats.num_new_internal_arrays);
4420 m_minima.add_internal_array(
4421 static_cast<unsigned>(m_internal_arrays.size()) - 1
4425 m_internal_size += size - used;
4426 m_mem_left -= internal_array_type::int_memory(capacity);
4429 "Internal array level is larger than anything possible "
4430 "in this universe. Increase the size of m_internal_levels");
4432 ++m_internal_levels[level];
4434 m_stats.max_num_internal_arrays.set_max(m_internal_arrays.size());
4437 if (m_internal_levels[level] < c_max_internal_level_size)
return;
4441 std::vector<iterator_pair_type> sequences;
4442 std::vector<unsigned_type> ia_index;
4444 for (
unsigned_type i = 0; i < m_internal_arrays.size(); ++i)
4446 if (m_internal_arrays[i].level() != level)
continue;
4447 if (m_internal_arrays[i].empty())
continue;
4449 level_size += m_internal_arrays[i].size();
4450 int_memory += m_internal_arrays[i].int_memory();
4451 sequences.push_back(std::make_pair(m_internal_arrays[i].begin(),
4452 m_internal_arrays[i].end()));
4453 ia_index.push_back(i);
4457 if (m_mem_left < internal_array_type::int_memory(level_size))
return;
4460 STXXL_ASSERT(int_memory >= internal_array_type::int_memory(level_size));
4463 " level=" << level <<
4464 " level_size=" << level_size <<
4465 " sequences=" << sequences.size());
4467 std::vector<value_type> merged_array(level_size);
4470 sequences.begin(), sequences.end(),
4471 merged_array.begin(), level_size, m_inv_compare);
4477 m_internal_arrays[ia].make_empty();
4483 cleanup_internal_arrays();
4486 m_internal_size -= level_size;
4489 add_as_internal_array(merged_array, 0, level + 1);
4503 flush_ia_ea_until_memory_free(
4504 internal_array_type::int_memory(values.size())
4507 add_as_internal_array(values);
4654 <<
"hint_time=" << o.
hint_time << std::endl;
4665 class AllocStrategy,
4671 DefaultMemSize, MaxItems>::c_default_extract_buffer_ram_part = 0.05;
4675 #endif // !STXXL_CONTAINERS_PARALLEL_PRIORITY_QUEUE_HEADER
bool empty() const
Returns true if the array is empty.
const float m_num_read_blocks_per_ea
Number of read/prefetch blocks per external array.
void add_prefetch(block_type *&block)
Add block to prefetch pool.
size_type size() const
The number of elements in the queue.
const long m_num_insertion_heaps
Number of insertion heaps. Usually equal to the number of CPUs.
void bulk_push_end()
Ends a sequence of push operations.
parent_type::proc_vector_type proc_vector_type
void flush_insertion_heap(unsigned_type p)
Flushes the insertions heap p into an internal array.
#define STXXL_ASSERT(condition)
stats_counter num_extracts
Total number of extracts.
parent_type::inv_compare_type compare_type
size_t num_used_blocks() const
Returns the number of blocks loaded in RAM.
compat::remove_const< Integral >::type div_ceil(Integral n, Integral2 d)
#define STXXL_DEFAULT_BLOCK_SIZE(type)
unsigned_type m_bulk_first_delayed_external_array
First index in m_external_arrays that was not re-hinted during a bulk_push sequence.
void swap(internal_array &o)
Swap internal_array with another one.
internal_size_type m_current
pointer to the current element inside the referenced block. undefined while m_live is false...
void rebuild_hints_prehint_next_block()
Advance m_unhinted_block index without actually prefetching.
stats_counter num_new_internal_arrays
Temporary number of new internal arrays at the same time (which were created while the extract buffer...
parent_type & m_parent
The priority queue.
void rebuild_hint_tree()
Rebuild hint tree completely as the hint sequence may have changed, and re-hint the correct block seq...
unsigned_type get_end_block_index() const
Returns the block beyond the block in which *(m_end_index-1) is located.
void flush_insertion_heaps()
Flushes all insertions heaps into an internal array.
head_comp(self_type &parent, proc_vector_type &proc, ias_type &ias, const compare_type &compare)
external_size_type get_index() const
return the current absolute index inside the external array.
bool extract_buffer_empty() const
Returns if the extract buffer is empty.
unsigned_type wait_all_hinted_blocks()
Waits until all hinted blocks are read into RAM. Returns how many blocks were successfully read...
size_type m_extract_buffer_index
Index of the currently smallest element in the extract buffer.
friend std::ostream & operator<<(std::ostream &os, const uint_pair &a)
make a uint_pair outputtable via iostreams, using unsigned long long.
external_size_type m_capacity
The total size of the external array in items. Cannot be changed after construction.
bool is_heap(RandomAccessIterator first, RandomAccessIterator last, StrictWeakOrdering comp=std::less< typename std::iterator_traits< RandomAccessIterator >::value_type >())
Struct of all statistical counters and timers. Turn on/off statistics using the stats_counter and sta...
stats_counter total_extract_buffer_size
Sum of the sizes of each extract buffer refill. Used for average size.
bool valid() const
Returns if the data requested to be in internal memory is completely fetched. True if wait() has been...
stats_counter num_new_external_arrays
Temporary number of new external arrays at the same time (which were created while the extract buffer...
unsigned_type m_num_read_blocks
Total number of read/prefetch buffer blocks.
stats_timer direct_flush_time
Total time for flush_directly_to_hd()
void bulk_push_begin(size_type bulk_size)
Start a sequence of push operations.
std::pair< iterator, iterator > iterator_pair_type
void rebuild_hints_finish()
Perform real-hinting of pre-hinted blocks, since now canceled blocks are available.
size_type m_internal_size
Number of elements in the internal arrays.
size_type m_extract_buffer_limit
Maximum size of extract buffer in number of elements Only relevant if c_limit_extract_buffer==true.
void print_stats() const
Prints statistical data.
size_t size() const
Returns the current size of the array.
stats_counter num_internal_array_flushes
Number of flush_internal_arrays() calls.
external_array(external_size_type size, pool_type *pool, unsigned_type level=0)
Constructs an external array.
bool has_em_data() const
Returns if there is data in EM, that's not randomly accessible.
void sort(ExtIterator first, ExtIterator last, StrictWeakOrdering cmp, unsigned_type M)
Sort records comparison-based, see stxxl::sort -- Sorting Comparison-Based.
iterator()
default constructor (should not be used directly)
bool bulk_pop_limit(std::vector< value_type > &out, const value_type &limit, size_t max_size=std::numeric_limits< size_t >::max())
Extracts all elements which are greater or equal to a given limit.
writer_type * m_writer
pointer to the external array containing the elements
unsigned_type remove_items(size_t n)
Removes the first n elements from the array. Returns the number of blocks released into the block poo...
unsigned long long int uint64
std::vector< value_type > m_extract_buffer
The extract buffer where external (and internal) arrays are merged into for extracting.
void deactivate_player(unsigned_type index)
deactivate a player and replay.
void update_block_pointers(size_t block_index)
Updates the m_block_pointers vector. Should be called after any steal() or read() operation...
random_number32_r m_rng
Random number generator for randomly selecting a heap in sequential push()
bool m_is_very_large_bulk
If the bulk currently being inserted is very large, this boolean is set and bulk_push just accumulate...
const value_type & get_value(int index) const
size_t calculate_merge_sequences(std::vector< size_type > &sizes, std::vector< iterator_pair_type > &sequences, bool reuse_previous_lower_bounds=false)
Calculates the sequences vector needed by the multiway merger, considering inaccessible data from ext...
unsigned_type c_max_internal_level_size
currently global public tuning parameter:
void flush_insertion_heaps_with_limit(const value_type &limit)
Flushes all elements of the insertion heaps which are greater or equal to a given limit...
bool empty() const
Returns if the queue is empty.
void convert_eb_into_ia(bool do_not_flush=false)
Convert extract buffer into a new internal array.
size_t num_hinted_blocks() const
Returns the number of hinted blocks.
unsigned_type m_level
Level of internal array (Sander's PQ: group number)
ppq_local::minima_tree< parallel_priority_queue< value_type, compare_type, alloc_strategy, block_size, DefaultMemSize, MaxItems > > minima_type
type of minima tree combining the structures
void flush_internal_arrays()
Flushes the internal arrays into an external array.
stats_timer pop_heap_time
Total time for pop_heap() in extract_min(). Part of extract_min_time.
block_type * m_block
pointer to the referenced block. undefined while m_live is false.
const compare_type & compare
void flush_array_internal(std::vector< value_type > &values)
Sorts the values from values and writes them into an internal array.
The class winner_tree is a binary tournament tree.
bool m_in_bulk_push
Flag if inside a bulk_push sequence.
Compares the largest accessible value of two external arrays.
RandomAccessIterator3 multiway_merge(RandomAccessIteratorPairIterator seqs_begin, RandomAccessIteratorPairIterator seqs_end, RandomAccessIterator3 target, DiffType length, Comparator comp)
Multi-way merging dispatcher.
minima_vector m_minima
stores the minimum value of each block
ForwardIterator swap_remove_if(ForwardIterator first, ForwardIterator last, UnaryPredicate pred)
Transforms the range [first,last) into a range with all the elements for which pred returns true remo...
stats_timer refill_extract_buffer_time
Total time for refill_extract_buffer()
winner_tree< hint_comparator > m_hint_tree
Tracks the largest values of the block hinted the latest of the external arrays if there is unaccessi...
void make_live()
allocates a reference to the block's data (possibly reading it from EM).
proc_vector_type m_proc
Array of processor local data structures, including the insertion heaps.
friend void swap(internal_array &a, internal_array &b)
Swap internal_array with another one.
std::vector< unsigned int > m_ref_count
reference counters for the number of live iterators on the corresponding block in external_array...
Inverse comparison functor.
head_comp m_head_comp
Comperator instances.
size_t get_max_index() const
The index of the largest element in the array.
ptrdiff_t difference_type
bool has_unhinted_em_data() const
Returns if there is data in EM, that's not already hinted to the prefetcher.
internal_size_type m_block_index
index of the current element's block in the external array's block list. undefined while m_live is fa...
#define STXXL_DEFAULT_ALLOC_STRATEGY
void replay_on_change(unsigned_type index, bool done=false)
static uint_pair min()
return an uint_pair instance containing the smallest value possible
void update()
updates m_block_index and m_current based on m_index
High-performance smart pointer used as a wrapping reference counting pointer.
ExternalArrayType ea_type
stats_timer merge_sorted_heaps_time
Total time for merging the sorted heaps. Part of flush_insertion_heaps.
bool m_limit_extract
flag if inside a bulk limit extract session
~parallel_priority_queue()
Destructor.
inv_compare_type m_inv_compare
>-Comparator for value_type
ppq_local::internal_array< value_type > internal_array_type
bool m_live
when operator* or operator-> are called, then the iterator goes live and allocates a reference to the ...
void rebuild_hints_prepare()
This method prepares rebuilding the hints (this is done after creating a new EA in order to always ha...
stats_timer refill_wait_time
Total time of wait() calls in first part of refill_extract_buffer(). Part of refill_time_before_merge...
std::vector< value_type > m_aggregated_pushes
The aggregated pushes. They cannot be extracted yet.
parent_type::value_type value_type
request_ptr read(block_type *&block, bid_type bid)
Reads block.
stats_counter max_extract_buffer_size
Largest number of elements in the extract buffer at the same time.
stats_timer external_array_merge_time
Total time for merge_external_arrays()
unsigned_type m_level
Level of external array (Sander's PQ: group number)
size_t size() const
Returns the current size in items.
Class fake_timer is a drop-in replacement for timer, which does nothing.
minima_type m_minima
The winner tree containing the smallest values of all sources where the globally smallest element cou...
unsigned_type level() const
Returns the level (group number) of the array.
ea_type & m_ea
reference to the external array to be written
read_write_pool< block_type > pool_type
void deactivate_heap(unsigned index)
Remove an insertion heap from the minima tree.
const compare_type & m_compare
void rebuild_internal_arrays()
iterator(writer_type *writer, external_size_type index)
construct a new iterator
mutex m_mutex
mutex for reference counting array (this is actually nicer than openmp's critical) ...
proc_vector_type & m_proc
uint_pair & operator++()
prefix increment operator (directly manipulates the integer parts)
std::random_access_iterator_tag iterator_category
External array stores a sorted sequence of values on the hard disk and allows access to the first blo...
static size_t int_memory(size_t capacity)
Returns memory usage of EA with given capacity, excluding blocks loaded in RAM. Blocks belong to pref...
size_type insertion_heap_int_memory() const
Return size of insertion heap reservation in bytes.
std::vector< iterator > m_live_boundary
optimization: hold live iterators for the expected boundary blocks of multiway_merge().
bid_vector m_bids
The IDs of each block in external memory.
void wait_next_ea_blocks(unsigned_type ea_index)
Requests more EM data from a given EA and updates the winner trees and hints accordingly.
bid_vector::iterator bid_iterator
void cleanup_external_arrays()
Clean up empty external arrays, free their memory and capacity.
std::vector< ProcessorData * > proc_vector_type
size_t int_memory() const
Return the amount of internal memory used by the EA.
stats_timer refill_merge_time
Total time for the merging in refill_extract_buffer() Part of refill_extract_buffer_time.
ppq_iterator< value_type > iterator
#define STXXL_CHECK_EQUAL(a, b)
STXXL_CHECK_EQUAL(a,b) is an assertion macro for unit tests, similar to STXXL_CHECK(a==b). The difference is that STXXL_CHECK_EQUAL(a,b) also prints the values of a and b. Attention: a and b must be printable with std::cout!
unsigned_type m_old_unhinted_block
The first unhinted block index as it was before the prepare_rebuilding_hints() call. Used for removal of hints which aren't needed anymore.
size_t last_block_items()
void cleanup_internal_arrays()
Clean up empty internal arrays, free their memory and capacity.
unsigned_type top() const
Returns the winner. Remember running replay_on_pop() if the value of the winner changes.
friend void swap(external_array &a, external_array &b)
Swap external_array with another one.
const inv_compare_type & m_compare
ppq_local::external_array< value_type, block_size, AllocStrategy > external_array_type
size_t get_min_index() const
The index of the currently smallest element in the array.
stats_counter max_num_new_internal_arrays
Largest number of new internal arrays at the same time (which were created while the extract buffer h...
#define STXXL_CHECK(condition)
STXXL_CHECK is an assertion macro for unit tests, which contrarily to assert() also works in release ...
const value_type * const_iterator
const compare_type & m_compare
external_size_type m_index
index of the current element, absolute in the external array
external_size_type m_index
The read position in the array.
size_t int_memory() const
Return the amount of internal memory used by the array.
external_array_writer< self_type > writer_type
std::vector< block_type * > block_vector
ppq_iterator< value_type > iterator
request_vector m_requests
The read request pointers are used to wait until the block has been completely fetched.
uint_pair & operator+=(const uint_pair &b)
addition operator (uses 64-bit arithmetic)
block_vector m_blocks
A vector of size m_num_blocks with block_type pointers, some of them will be filled while writing...
pool_type m_pool
Prefetch and write buffer pool for external arrays (has to be in front of m_external_arrays) ...
const std::vector< unsigned_type > & m_indices
size_type m_external_size
Number of elements in the external arrays.
ppq_iterator()
default constructor (should not be used directly)
void merge_external_arrays()
Merges all external arrays and all internal arrays into one external array.
proc_vector_type & m_proc
const external_arrays_type & m_eas
Block containing elements of fixed length.
counting_ptr< request > request_ptr
A reference counting pointer for request.
stats_timer internal_array_flush_time
Total time for flush_internal_arrays()
unsigned_type m_min_index
Index of the current head.
void activate_without_replay(unsigned_type index)
activate a player and resize if necessary
external_array< value_type, BlockSize, AllocStrategy > self_type
size_t buffer_size() const
Returns the number elements available in internal memory.
void add_as_internal_array(std::vector< value_type > &values, unsigned_type used=0, unsigned_type level=0)
Add new internal array, which requires that values are sorted! automatically decreases m_mem_left! al...
std::vector< request_ptr > request_vector
void refill_extract_buffer(size_t minimum_size=0, size_t maximum_size=0)
Refills the extract buffer from the external arrays.
void update_hint_tree(size_t ea_index)
Updates the prefetch prediction tree afer a remove_items(), which frees up blocks.
bool operator!=(const uint_pair &b) const
inequality checking operator
iterator begin() const
Returns a random-access iterator to the begin of the data in internal memory.
void write_block(size_t block_index)
Called by the external_array_writer to write a block from m_blocks[] to disk. Prior to writing and re...
Types
Entries in the head winner tree.
iterator::block_pointers_type block_pointers_type
bool operator<=(const uint_pair &b) const
less-or-equal comparison operator
stats_counter max_num_external_arrays
Largest number of external arrays at the same time.
void update_external_min_tree(size_t ea_index)
Updates the external min tree afer a remove() or a wait_next_blocks() call.
void limit_begin(const value_type &limit, size_type bulk_size)
Begin bulk-limit extraction session with limit element.
void make_empty()
Make this array empty.
void bulk_pop(std::vector< value_type > &out, size_t max_size)
Extract up to max_size values at once.
void hint_next_block()
Prefetch the next unhinted block, requires one free read block from the global pool.
ptrdiff_t difference_type
choose_int_types< my_pointer_size >::int_type int_type
static const double c_default_extract_buffer_ram_part
Default limit of the extract buffer ram consumption as share of total ram.
std::vector< BID< BlockSize > > bid_vector
Compares the largest value of the block hinted the latest of two external arrays. ...
void read_block(size_t block_index)
Called by the external_array_writer to read a block from disk into m_blocks[]. If the block is marked...
void add_internal_array(unsigned index)
Add a newly created internal array to the minima tree.
stats_timer insertion_heap_flush_time
Total time for flush_insertion_heaps()
bool operator>(const uint_pair &b) const
greater comparison operator
unsigned_type m_num_hinted_blocks
number of currently hinted prefetch blocks
stats_timer refill_accumulate_time
Total time for std::lower_bound calls in refill_extract_buffer() Part of refill_extract_buffer_time a...
const value_type & get_next_hintable_min() const
Returns the smallest element of the next hint candidate (the block after the last hinted one)...
iterator end() const
Returns a random-access iterator 1 behind the end of the data in internal memory. ...
stats_counter max_merge_buffer_size
Largest number of elements in the merge buffer when running flush_internal_arrays() ...
iterator end() const
End iterator.
std::pair< unsigned, unsigned > top()
Return smallest items of head winner tree.
unsigned_type wait_next_blocks()
Waits until the next prefetched block is read into RAM, then polls for any further blocks that are do...
void inc_min(size_t diff=1)
Use inc_min(diff) if multiple values have been extracted.
void rebuild_hints_cancel()
Cancel hints which aren't needed anymore from the prefetcher and fixes it's size. prepare_rebuilding_...
stats_counter num_direct_flushes
Number of flush_directly_to_hd() calls.
stats_counter max_num_new_external_arrays
Largest number of new external arrays at the same time (which were created while the extract buffer h...
unsigned_type internal_size_type
heap_type insertion_heap
The heaps where new elements are usually inserted into.
const value_type & get_next_block_min() const
Returns the smallest element of the first block NOT in internal memory (or at least requested to be i...
block_pointers_type m_block_pointers
Begin and end pointers for each block, used for merging with ppq_iterator.
AllocStrategy alloc_strategy
A struct containing the local insertion heap and other information local to a processor.
request_ptr write(block_type *&block, bid_type bid)
Passes a block to the pool for writing.
fake_timer stats_timer
Defines if statistics are gathered: fake_timer or timer.
std::string as_memory_amount(const value_type &) const
void clear_heaps()
Remove all insertion heaps from the minima tree.
stats_timer extract_min_time
Total time for extract_min()
size_t capacity() const
Returns the capacity in items.
bool operator>=(const uint_pair &b) const
greater-or-equal comparison operator
minima_tree(parent_type &parent)
Construct the tree of minima sources.
size_t ia_slots() const
Return size of internal arrays minima tree.
winner_tree< heaps_comp > m_heaps
void clear_internal_arrays()
Remove all internal arrays from the minima tree.
void limit_pop()
Remove the minimum element, only works correctly while elements < L.
void flush_ia_ea_until_memory_free(internal_size_type mem_free)
Free up memory by flushing internal arrays and combining external arrays until enough bytes are free...
std::random_access_iterator_tag iterator_category
bool operator<(const uint_pair &b) const
less-than comparison operator
std::vector< value_type > m_values
Contains the items of the sorted sequence.
Aquire a lock that's valid until the end of scope.
internal_arrays_type m_internal_arrays
The sorted arrays in internal memory.
#define STXXL_BEGIN_NAMESPACE
std::vector< std::pair< pointer, pointer > > block_pointers_type
void update_heap(int_type index)
Update minima tree after an item from the heap index was removed.
ea_type::block_type block_type
block_type * steal_prefetch()
Take out a block from the prefetch pool, one unhinted free block must be available.
A random-access iterator class for block oriented data.
void update_extract_buffer()
Update minima tree after an item of the extract buffer was removed.
uint_pair & operator--()
prefix decrement operator (directly manipulates the integer parts)
external_array_writer writer_type
Parallelized External Memory Priority Queue.
static size_t int_memory(size_t capacity)
Return the amount of internal memory used by an array with the capacity in number of items...
unsigned_type m_num_used_read_blocks
number of currently loaded blocks
static unsigned_type push_heap(RandomAccessIterator first, RandomAccessIterator last, HeapCompareType comp)
SiftUp a new element from the last position in the heap, reestablishing the heap invariant.
const inv_compare_type & m_compare
void bulk_push(const value_type &element, const unsigned_type p)
Push an element inside a sequence of pushes.
size_t num_blocks() const
Return the number of blocks.
unsigned_type m_unhinted_block
The first unhinted block index.
internal_array_type::iterator iterator
internal_array(std::vector< value_type > &values, unsigned_type min_index=0, unsigned_type level=0)
Constructor which takes a value vector. The value vector is empty afterwards.
Fast uniform [0, 2^32) pseudo-random generator with period 2^32, random bits: 32. Reentrant variant o...
unsigned_type level() const
Returns the level (group number) of the array.
minima_tree< ParentType > self_type
#define STXXL_DEBUG(x)
STXXL_DEBUG is a macro which prints iff the locally defined variable "debug" is true. Use this for scoped-based debug variables.
void rebuild()
Build from winner tree from scratch.
static uint_pair max()
return an uint_pair instance containing the largest value possible
ExternalArrayType ea_type
parent_type::external_arrays_type eas_type
winner_tree< external_min_comparator > m_external_min_tree
Tracks the largest accessible values of the external arrays if there is unaccessible data in EM...
block_type * steal()
Take out a block from the pool.
const external_arrays_type & m_eas
parent_type::internal_arrays_type ias_type
winner_tree< head_comp > m_head
The winner trees.
const value_type & top()
Access the minimum element.
pointer m_current
pointer to the current element
const compare_type & m_compare
stats_timer refill_minmax_time
Total time for determining the smallest max value in refill_extract_buffer() Part of refill_extract_b...
external_min_comparator(const external_arrays_type &eas, const inv_compare_type &compare)
void aggregate_push(const value_type &element)
Aggregate pushes.
read_write_pool< block_type > pool_type
typed_block< block_size, value_type > block_type
void prepare_write(unsigned_type num_threads)
prepare the external_array for writing using multiway_merge() with num_threads. this method is called...
stats_timer refill_time_before_merge
Total time for all things before merging in refill_extract_buffer() Part of refill_extract_buffer_tim...
parallel_priority_queue(const compare_type &compare=compare_type(), size_type total_ram=DefaultMemSize, float num_read_blocks_per_ea=1.5f, unsigned_type num_write_buffer_blocks=c_num_write_buffer_blocks, unsigned_type num_insertion_heaps=0, size_type single_heap_ram=c_default_single_heap_ram, size_type extract_buffer_ram=0)
Constructor.
external_array()
Default constructor. Don't use this directy. Needed for regrowing in surrounding vector.
const size_type m_mem_for_heaps
Size of all insertion heaps together in bytes.
void new_blocks(const DiskAssignFunctor &functor, BIDIteratorClass bidbegin, BIDIteratorClass bidend, unsigned_type offset=0)
Allocates new blocks.
size_type memory_consumption() const
The memory consumption in Bytes.
external_arrays_type m_external_arrays
The sorted arrays in external memory.
ea_type::value_type value_type
stxxl::swap_vector< internal_array_type > internal_arrays_type
type of internal arrays vector
size_type heap_add_size
The number of items inserted into the insheap during bulk parallel access.
void deactivate_internal_array(unsigned index)
Remove an internal array from the minima tree.
size_t m_index
index of the current element
pool_type * m_pool
Common prefetch and write buffer pool.
ea_type::value_type value_type
void advance_arrays(std::vector< iterator_pair_type > &sequences, std::vector< size_type > &sizes, size_t eas, size_t ias)
typed_block< BlockSize, value_type > block_type
void limit_push(const value_type &element, const unsigned_type p=0)
Push new item >= bulk-limit element into insertion heap p.
const inv_compare_type & m_compare
std::string to_string() const
Returns a readable representation of the winner tree as string.
const compare_type & m_compare
value_type comparator
stats_counter num_insertion_heap_flushes
Number of flush_insertion_heaps() calls.
winner_tree< ia_comp > m_ia
const value_type & get_value(int input) const
bool m_limit_has_full_range
flag if the extract buffer contains the full limit range
stats_timer refill_time_after_merge
Total time for all things after merging in refill_extract_buffer() Part of refill_extract_buffer_time...
internal_array()
Default constructor. Don't use this directy. Needed for regrowing in surrounding vector.
void resize_read_pool()
Automatically resize the read/prefetch buffer pool depending on number of external arrays...
uint64 external_size_type
const value_type & get_min()
Returns the smallest element in the array.
choose_int_types< my_pointer_size >::unsigned_type unsigned_type
size_t m_block_index
index of the current element's block
Internal arrays store a sorted sequence of values in RAM, which will be merged together into the dele...
size_t m_block_items
size of each data block
dummy_custom_stats_counter< uint64 > stats_counter
Defines if statistics are gathered: dummy_custom_stats_counter or custom_stats_counter.
void bulk_push(const value_type &element)
Push an element inside a bulk sequence of pushes.
#define STXXL_DEBUG0(x)
STXXL_DEBUG0 is a macro which never prints the debug message, used to temporarily disable STXXL_DEBUG...
std::vector< BID< block_size > > bid_vector
std::vector< value_type > minima_vector
stats_counter max_num_internal_arrays
Largest number of internal arrays at the same time.
void check_invariants() const
Assert many invariants of the data structures.
compare_type m_compare
<-Comparator for value_type
Unary operator which returns true if the external array has run empty.
~external_array()
Destructor.
size_type m_mem_left
Free memory in bytes.
void finish_write()
finish the writing phase after multiway_merge() filled the vector. this method is called by the exter...
void deactivate_extract_buffer()
Remove the extract buffer from the minima tree.
unsigned_type c_max_external_level_size
currently global public tuning parameter:
void limit_end()
Finish bulk-limit extraction session.
ia_comp(ias_type &ias, const compare_type &compare)
iterator(const iterator &other)
copy an iterator, the new iterator is not automatically live!
block_type * get_block_ref(size_t block_index)
read block into memory and increase reference count (called when an iterator goes live on the block)...
void update_internal_array(unsigned index)
Update minima tree after an item from an internal array was removed.
heaps_comp(proc_vector_type &proc, const compare_type &compare)
Comparator for the insertion heaps winner tree.
The minima_tree contains minima from all sources inside the PPQ.
WinnerTree-Comparator for the head winner tree. It accesses all relevant data structures from the pri...
const external_arrays_type & m_eas
bool empty() const
Returns if the array has run empty.
const block_pointers_type * m_block_pointers
pointer to a vector of begin/end pointer pairs They allow access to the data blocks.
iterator begin() const
Begin iterator.
external_array_writer self_type
value_type m_limit_element
current limit element
void print_stats() const
Print statistics.
s_min_tree_comparator(const external_arrays_type &eas, const inv_compare_type &compare, const std::vector< unsigned_type > &indices)
bid_vector bids_container_type
const unsigned m_insertion_heap_capacity
Capacity of one inserion heap.
void swap(external_array &o)
Swap external_array with another one.
unsigned int m_ref_total
total number of iterators referencing this writer
const value_type & limit_top()
Access the minimum element, which can only be in the extract buffer.
const value_type & get_min() const
The currently smallest element in the array.
size_type m_heaps_size
Number of elements int the insertion heaps.
stats_counter num_external_array_merges
Number of merge_external_arrays() calls.
hint_comparator(const external_arrays_type &eas, const inv_compare_type &compare)
external_size_type m_end_index
The index behind the last element that is located in RAM (or is at least requested to be so) ...
std::vector< value_type > heap_type
type of insertion heap itself
Comparator for the internal arrays winner tree.
stats_counter num_extract_buffer_refills
Number of refill_extract_buffer() calls.
static void prepare_write_pool(pool_type &pool, unsigned_type num_threads)
prepare the pool for writing external arrays with given number of threads
external_size_type m_size
The total number of elements minus the number of extracted values.
void hint_external_arrays()
Hints EA blocks which will be needed soon. Hints at most m_num_prefetchers blocks globally...
size_t capacity() const
Returns the initial size of the array.
size_type m_extract_buffer_size
Number of elements in the extract buffer.
void push(const value_type &element, unsigned_type p=0)
Insert new element.
bool block_valid(size_t block_index) const
Returns if the block with the given index is completely fetched.
bool m_write_phase
Is array in write phase? True = write phase, false = read phase.
void resize_write(size_type new_size)
Resizes size of the pool.
Unary operator which returns true if the internal array has run empty.
block_pointers_type m_block_pointers
Begin and end pointers of the array This is used by the iterator.
size_type size_write() const
Returns number of blocks owned by the write_pool.
void pop()
Remove the minimum element.
void free_block_ref(size_t block_index)
decrease reference count on the block, and possibly write it to disk (called when an iterator release...
unsigned_type m_num_blocks
Number of blocks, again: calculated at construction time.
external_array_writer(ea_type &ea, unsigned int num_threads=0)
void deactivate_without_replay(unsigned_type index)
deactivate a player
external_array_type::writer_type external_array_writer_type
bool operator==(const uint_pair &b) const
equality checking operator
ppq_iterator(const block_pointers_type *block_pointers, size_t block_items, size_t index)
constructor
void check_external_level(unsigned_type level, bool force_merge_all=false)
Merges external arrays if there are too many external arrays on the same level.
#define STXXL_END_NAMESPACE
size_t get_index() const
returns the value's index in the internal or external array
inv_compare_type(const compare_type &c)
const size_type m_mem_total
Total amount of internal memory.
unsigned_type get_current_block_index() const
Returns the block in which m_index is located.
std::vector< value_type >::iterator value_iterator
stxxl::swap_vector< external_array_type > external_arrays_type
type of external arrays vector
iterator::block_pointers_type block_pointers_type