STXXL  1.4.0
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
request_queue_impl_qwqr.cpp
Go to the documentation of this file.
1 /***************************************************************************
2  * lib/io/request_queue_impl_qwqr.cpp
3  *
4  * Part of the STXXL. See http://stxxl.sourceforge.net
5  *
6  * Copyright (C) 2002-2005 Roman Dementiev <[email protected]>
7  * Copyright (C) 2008, 2009 Andreas Beckmann <[email protected]>
8  * Copyright (C) 2009 Johannes Singler <[email protected]>
9  * Copyright (C) 2013 Timo Bingmann <[email protected]>
10  *
11  * Distributed under the Boost Software License, Version 1.0.
12  * (See accompanying file LICENSE_1_0.txt or copy at
13  * http://www.boost.org/LICENSE_1_0.txt)
14  **************************************************************************/
15 
16 #include <algorithm>
17 
21 #include <stxxl/bits/parallel.h>
22 
23 #if STXXL_STD_THREADS && STXXL_MSVC >= 1700
24  #include <windows.h>
25 #endif
26 
27 #ifndef STXXL_CHECK_FOR_PENDING_REQUESTS_ON_SUBMISSION
28 #define STXXL_CHECK_FOR_PENDING_REQUESTS_ON_SUBMISSION 1
29 #endif
30 
32 
33 struct file_offset_match : public std::binary_function<request_ptr, request_ptr, bool>
34 {
35  bool operator () (
36  const request_ptr& a,
37  const request_ptr& b) const
38  {
39  // matching file and offset are enough to cause problems
40  return (a->get_offset() == b->get_offset()) &&
41  (a->get_file() == b->get_file());
42  }
43 };
44 
45 request_queue_impl_qwqr::request_queue_impl_qwqr(int n)
46  : m_thread_state(NOT_RUNNING), sem(0)
47 {
48  STXXL_UNUSED(n);
49  start_thread(worker, static_cast<void*>(this), thread, m_thread_state);
50 }
51 
53 {
54  if (req.empty())
55  STXXL_THROW_INVALID_ARGUMENT("Empty request submitted to disk_queue.");
56  if (m_thread_state() != RUNNING)
57  STXXL_THROW_INVALID_ARGUMENT("Request submitted to not running queue.");
58 
59  if (req.get()->get_type() == request::READ)
60  {
61 #if STXXL_CHECK_FOR_PENDING_REQUESTS_ON_SUBMISSION
62  {
64  if (std::find_if(write_queue.begin(), write_queue.end(),
65  bind2nd(file_offset_match(), req) _STXXL_FORCE_SEQUENTIAL)
66  != write_queue.end())
67  {
68  STXXL_ERRMSG("READ request submitted for a BID with a pending WRITE request");
69  }
70  }
71 #endif
73  read_queue.push_back(req);
74  }
75  else
76  {
77 #if STXXL_CHECK_FOR_PENDING_REQUESTS_ON_SUBMISSION
78  {
80  if (std::find_if(read_queue.begin(), read_queue.end(),
81  bind2nd(file_offset_match(), req) _STXXL_FORCE_SEQUENTIAL)
82  != read_queue.end())
83  {
84  STXXL_ERRMSG("WRITE request submitted for a BID with a pending READ request");
85  }
86  }
87 #endif
89  write_queue.push_back(req);
90  }
91 
92  sem++;
93 }
94 
96 {
97  if (req.empty())
98  STXXL_THROW_INVALID_ARGUMENT("Empty request canceled disk_queue.");
99  if (m_thread_state() != RUNNING)
100  STXXL_THROW_INVALID_ARGUMENT("Request canceled to not running queue.");
101 
102  bool was_still_in_queue = false;
103  if (req.get()->get_type() == request::READ)
104  {
106  queue_type::iterator pos;
107  if ((pos = std::find(read_queue.begin(), read_queue.end(), req _STXXL_FORCE_SEQUENTIAL)) != read_queue.end())
108  {
109  read_queue.erase(pos);
110  was_still_in_queue = true;
111  sem--;
112  }
113  }
114  else
115  {
117  queue_type::iterator pos;
118  if ((pos = std::find(write_queue.begin(), write_queue.end(), req _STXXL_FORCE_SEQUENTIAL)) != write_queue.end())
119  {
120  write_queue.erase(pos);
121  was_still_in_queue = true;
122  sem--;
123  }
124  }
125 
126  return was_still_in_queue;
127 }
128 
130 {
132 }
133 
135 {
136  self* pthis = static_cast<self*>(arg);
137 
138  bool write_phase = true;
139  for ( ; ; )
140  {
141  pthis->sem--;
142 
143  if (write_phase)
144  {
145  scoped_mutex_lock WriteLock(pthis->write_mutex);
146  if (!pthis->write_queue.empty())
147  {
148  request_ptr req = pthis->write_queue.front();
149  pthis->write_queue.pop_front();
150 
151  WriteLock.unlock();
152 
153  //assert(req->nref() > 1);
154  req->serve();
155  }
156  else
157  {
158  WriteLock.unlock();
159 
160  pthis->sem++;
161 
162  if (pthis->_priority_op == WRITE)
163  write_phase = false;
164  }
165 
166  if (pthis->_priority_op == NONE
167  || pthis->_priority_op == READ)
168  write_phase = false;
169  }
170  else
171  {
172  scoped_mutex_lock ReadLock(pthis->read_mutex);
173 
174  if (!pthis->read_queue.empty())
175  {
176  request_ptr req = pthis->read_queue.front();
177  pthis->read_queue.pop_front();
178 
179  ReadLock.unlock();
180 
181  STXXL_VERBOSE2("queue: before serve request has " << req->nref() << " references ");
182  //assert(req->nref() > 1);
183  req->serve();
184  STXXL_VERBOSE2("queue: after serve request has " << req->nref() << " references ");
185  }
186  else
187  {
188  ReadLock.unlock();
189 
190  pthis->sem++;
191 
192  if (pthis->_priority_op == READ)
193  write_phase = true;
194  }
195 
196  if (pthis->_priority_op == NONE
197  || pthis->_priority_op == WRITE)
198  write_phase = true;
199  }
200 
201  // terminate if it has been requested and queues are empty
202  if (pthis->m_thread_state() == TERMINATING) {
203  if ((pthis->sem--) == 0)
204  break;
205  else
206  pthis->sem++;
207  }
208  }
209 
210  pthis->m_thread_state.set_to(TERMINATED);
211 
212 #if STXXL_STD_THREADS && STXXL_MSVC >= 1700
213  // Workaround for deadlock bug in Visual C++ Runtime 2012 and 2013, see
214  // request_queue_impl_worker.cpp. -tb
215  ExitThread(NULL);
216 #else
217  return NULL;
218 #endif
219 }
220 
222 // vim: et:ts=4:sw=4
void unlock()
unlock mutex hold prematurely
Definition: mutex.h:127
counting_ptr< request > request_ptr
A reference counting pointer for request.
Definition: request.h:112
Type * get() const
return the enclosed pointer.
Definition: counting_ptr.h:120
bool empty() const
test for a NULL pointer
Definition: counting_ptr.h:140
#define STXXL_THROW_INVALID_ARGUMENT(error_message)
Throws std::invalid_argument with &quot;Error in [function] : [error_message]&quot;.
void stop_thread(thread_type &t, state< thread_state > &s, semaphore &sem)
#define STXXL_VERBOSE2(x)
Definition: verbose.h:107
Aquire a lock that&#39;s valid until the end of scope.
Definition: mutex.h:106
#define STXXL_BEGIN_NAMESPACE
Definition: namespace.h:16
void STXXL_UNUSED(const U &)
Definition: unused.h:23
#define STXXL_ERRMSG(x)
Definition: verbose.h:79
request_type get_type() const
Definition: request.h:68
void start_thread(void *(*worker)(void *), void *arg, thread_type &t, state< thread_state > &s)
#define _STXXL_FORCE_SEQUENTIAL
Definition: parallel.h:51
ExtIterator find(ExtIterator begin, ExtIterator end, const EqualityComparable &value, int_type nbuffers=0)
External equivalent of std::find, see stxxl::find.
Definition: scan.h:274
virtual void serve()=0
#define STXXL_END_NAMESPACE
Definition: namespace.h:17