42 #include "MarshallerThread.h" 44 #include "InternalErr.h" 51 bool MarshallerThread::print_time =
false;
58 static double time_diff_to_hundredths(
struct timeval *stop,
struct timeval *start)
61 if (stop->tv_usec < start->tv_usec) {
62 int nsec = (start->tv_usec - stop->tv_usec) / 1000000 + 1;
63 start->tv_usec -= 1000000 * nsec;
64 start->tv_sec += nsec;
66 if (stop->tv_usec - start->tv_usec > 1000000) {
67 int nsec = (start->tv_usec - stop->tv_usec) / 1000000;
68 start->tv_usec += 1000000 * nsec;
69 start->tv_sec -= nsec;
72 double result = stop->tv_sec - start->tv_sec;
73 result += double(stop->tv_usec - start->tv_usec) / 1000000;
88 Locker::Locker(pthread_mutex_t &lock, pthread_cond_t &cond,
int &count) :
91 int status = pthread_mutex_lock(&m_mutex);
93 DBG(cerr <<
"Locking the mutex! (waiting; " << pthread_self() <<
")" << endl);
95 if (status != 0)
throw InternalErr(__FILE__, __LINE__,
"Could not lock m_mutex");
97 status = pthread_cond_wait(&cond, &m_mutex);
98 if (status != 0)
throw InternalErr(__FILE__, __LINE__,
"Could not wait on m_cond");
100 if (count != 0)
throw InternalErr(__FILE__, __LINE__,
"FAIL: left m_cond wait with non-zero child thread count");
102 DBG(cerr <<
"Locked! (" << pthread_self() <<
")" << endl);
110 DBG(cerr <<
"Unlocking the mutex! (" << pthread_self() <<
")" << endl);
112 (void) pthread_mutex_unlock(&m_mutex);
114 int status = pthread_mutex_unlock(&m_mutex);
115 if (status != 0)
throw InternalErr(__FILE__, __LINE__,
"Could not unlock m_mutex");
133 ChildLocker::ChildLocker(pthread_mutex_t &lock, pthread_cond_t &cond,
int &count) :
134 m_mutex(lock), m_cond(cond), m_count(count)
136 int status = pthread_mutex_lock(&m_mutex);
138 DBG(cerr <<
"Locking the mutex! (simple; " << pthread_self() <<
")" << endl);
140 if (status != 0)
throw InternalErr(__FILE__, __LINE__,
"Could not lock m_mutex");
142 DBG(cerr <<
"Locked! (" << pthread_self() <<
")" << endl);
145 ChildLocker::~ChildLocker()
147 DBG(cerr <<
"Unlocking the mutex! (" << pthread_self() <<
")" << endl);
151 (void) pthread_cond_signal(&m_cond);
152 (void) pthread_mutex_unlock(&m_mutex);
155 int status = pthread_cond_signal(&m_cond);
157 throw InternalErr(__FILE__, __LINE__,
"Could not signal main thread from ChildLocker!");
159 status = pthread_mutex_unlock(&m_mutex);
160 if (status != 0)
throw InternalErr(__FILE__, __LINE__,
"Could not unlock m_mutex");
164 MarshallerThread::MarshallerThread() :
165 d_thread(0), d_child_thread_count(0)
167 if (pthread_attr_init(&d_thread_attr) != 0)
throw Error(internal_error,
"Failed to initialize pthread attributes.");
168 if (pthread_attr_setdetachstate(&d_thread_attr, PTHREAD_CREATE_DETACHED ) != 0)
169 throw Error(internal_error,
"Failed to complete pthread attribute initialization.");
171 if (pthread_mutex_init(&d_out_mutex, 0) != 0)
throw Error(internal_error,
"Failed to initialize mutex.");
172 if (pthread_cond_init(&d_out_cond, 0) != 0)
throw Error(internal_error,
"Failed to initialize cond.");
175 MarshallerThread::~MarshallerThread()
177 (void) pthread_mutex_lock(&d_out_mutex);
179 int status = pthread_mutex_lock(&d_out_mutex);
180 if (status != 0)
throw InternalErr(__FILE__, __LINE__,
"Could not lock m_mutex");
184 if (d_child_thread_count != 0) {
185 (void) pthread_cond_wait(&d_out_cond, &d_out_mutex);
186 d_child_thread_count = 0;
188 status = pthread_cond_wait(&d_out_cond, &d_out_mutex);
189 if (status != 0)
throw InternalErr(__FILE__, __LINE__,
"Could not wait on m_cond");
193 (void) pthread_mutex_unlock(&d_out_mutex);
196 if (d_child_thread_count != 0)
197 throw InternalErr(__FILE__, __LINE__,
"FAIL: left m_cond wait with non-zero child thread count");
199 status = pthread_mutex_unlock(&d_out_mutex);
200 if (status != 0)
throw InternalErr(__FILE__, __LINE__,
"Could not unlock m_mutex");
203 pthread_mutex_destroy(&d_out_mutex);
204 pthread_cond_destroy(&d_out_cond);
206 pthread_attr_destroy(&d_thread_attr);
218 write_args *args =
new write_args(d_out_mutex, d_out_cond, d_child_thread_count, d_thread_error, out, byte_buf,
220 int status = pthread_create(&d_thread, &d_thread_attr, thread, args);
221 if (status != 0)
throw InternalErr(__FILE__, __LINE__,
"Could not start child thread");
229 write_args *args =
new write_args(d_out_mutex, d_out_cond, d_child_thread_count, d_thread_error, fd, byte_buf,
231 int status = pthread_create(&d_thread, &d_thread_attr, thread, args);
232 if (status != 0)
throw InternalErr(__FILE__, __LINE__,
"Could not start child thread");
247 write_args *args =
reinterpret_cast<write_args *
>(arg);
249 ChildLocker lock(args->d_mutex, args->d_cond, args->d_count);
253 if (print_time && gettimeofday(&tp_s, 0) != 0) cerr <<
"could not read time" << endl;
259 if (args->d_out_file != -1) {
260 int bytes_written = write(args->d_out_file, args->d_buf, args->d_num);
261 if (bytes_written != args->d_num)
265 args->d_out.write(args->d_buf, args->d_num);
266 if (args->d_out.fail()) {
268 oss <<
"Could not write data: " << __FILE__ <<
":" << __LINE__;
269 args->d_error = oss.str();
274 delete [] args->d_buf;
280 if (gettimeofday(&tp_e, 0) != 0) cerr <<
"could not read time" << endl;
282 cerr <<
"time for child thread write: " << time_diff_to_hundredths(&tp_e, &tp_s) << endl;
304 write_args *args =
reinterpret_cast<write_args *
>(arg);
306 ChildLocker lock(args->d_mutex, args->d_cond, args->d_count);
308 if (args->d_out_file != -1) {
309 int bytes_written = write(args->d_out_file, args->d_buf, args->d_num);
310 if (bytes_written != args->d_num)
return (
void*) -1;
313 args->d_out.write(args->d_buf + 4, args->d_num);
314 if (args->d_out.fail()) {
316 oss <<
"Could not write data: " << __FILE__ <<
":" << __LINE__;
317 args->d_error = oss.str();
322 delete [] args->d_buf;
static void * write_thread(void *arg)
void start_thread(void *(*thread)(void *arg), std::ostream &out, char *byte_buf, unsigned int bytes_written)
top level DAP object to house generic methods
A class for software fault reporting.
static void * write_thread_part(void *arg)
A class for error processing.