54bool MarshallerThread::print_time =
false;
61static double time_diff_to_hundredths(
struct timeval *stop,
struct timeval *start) {
63 if (stop->tv_usec < start->tv_usec) {
64 int nsec = (start->tv_usec - stop->tv_usec) / 1000000 + 1;
65 start->tv_usec -= 1000000 * nsec;
66 start->tv_sec += nsec;
68 if (stop->tv_usec - start->tv_usec > 1000000) {
69 int nsec = (start->tv_usec - stop->tv_usec) / 1000000;
70 start->tv_usec += 1000000 * nsec;
71 start->tv_sec -= nsec;
74 double result = stop->tv_sec - start->tv_sec;
75 result += double(stop->tv_usec - start->tv_usec) / 1000000;
89Locker::Locker(pthread_mutex_t &lock, pthread_cond_t &cond,
int &count) : m_mutex(lock) {
90 int status = pthread_mutex_lock(&m_mutex);
92 DBG(cerr <<
"Locking the mutex! (waiting; " << pthread_self() <<
")" << endl);
95 throw InternalErr(__FILE__, __LINE__,
"Could not lock m_mutex");
97 status = pthread_cond_wait(&cond, &m_mutex);
99 throw InternalErr(__FILE__, __LINE__,
"Could not wait on m_cond");
102 DBG(cerr <<
"Locked! (" << pthread_self() <<
")" << endl);
109 DBG(cerr <<
"Unlocking the mutex! (" << pthread_self() <<
")" << endl);
111 (void)pthread_mutex_unlock(&m_mutex);
129 : m_mutex(lock), m_cond(cond), m_count(count) {
130 int status = pthread_mutex_lock(&m_mutex);
132 DBG(cerr <<
"Locking the mutex! (simple; " << pthread_self() <<
")" << endl);
135 throw InternalErr(__FILE__, __LINE__,
"Could not lock m_mutex");
137 DBG(cerr <<
"Locked! (" << pthread_self() <<
")" << endl);
141 DBG(cerr <<
"Unlocking the mutex! (" << pthread_self() <<
")" << endl);
145 (void)pthread_cond_signal(&m_cond);
146 (void)pthread_mutex_unlock(&m_mutex);
150 if (pthread_attr_init(&d_thread_attr) != 0)
152 if (pthread_attr_setdetachstate(&d_thread_attr, PTHREAD_CREATE_DETACHED ) != 0)
155 if (pthread_mutex_init(&d_out_mutex,
nullptr) != 0)
157 if (pthread_cond_init(&d_out_cond,
nullptr) != 0)
162 (void)pthread_mutex_lock(&d_out_mutex);
165 if (d_child_thread_count != 0) {
166 (void)pthread_cond_wait(&d_out_cond, &d_out_mutex);
167 d_child_thread_count = 0;
170 (void)pthread_mutex_unlock(&d_out_mutex);
172 pthread_mutex_destroy(&d_out_mutex);
173 pthread_cond_destroy(&d_out_cond);
175 pthread_attr_destroy(&d_thread_attr);
185 auto *args =
new write_args(d_out_mutex, d_out_cond, d_child_thread_count, d_thread_error, out, byte_buf, bytes);
186 int status = pthread_create(&d_thread, &d_thread_attr, thread, args);
188 throw InternalErr(__FILE__, __LINE__,
"Could not start child thread");
195 auto *args =
new write_args(d_out_mutex, d_out_cond, d_child_thread_count, d_thread_error, fd, byte_buf, bytes);
196 int status = pthread_create(&d_thread, &d_thread_attr, thread, args);
198 throw InternalErr(__FILE__, __LINE__,
"Could not start child thread");
211 auto *args =
reinterpret_cast<write_args *
>(arg);
213 ChildLocker lock(args->d_mutex, args->d_cond, args->d_count);
217 if (print_time && gettimeofday(&tp_s, 0) != 0)
218 cerr <<
"could not read time" << endl;
224 if (args->d_out_file != -1) {
225 auto bytes_written = write(args->d_out_file, args->d_buf, args->d_num);
226 if (bytes_written != args->d_num)
229 args->d_out.write(args->d_buf, args->d_num);
230 if (args->d_out.fail()) {
232 oss <<
"Could not write data: " << __FILE__ <<
":" << __LINE__;
233 args->d_error = oss.str();
238 delete[] args->d_buf;
244 if (gettimeofday(&tp_e, 0) != 0)
245 cerr <<
"could not read time" << endl;
247 cerr <<
"time for child thread write: " << time_diff_to_hundredths(&tp_e, &tp_s) << endl;
267 auto *args =
reinterpret_cast<write_args *
>(arg);
269 ChildLocker lock(args->d_mutex, args->d_cond, args->d_count);
271 if (args->d_out_file != -1) {
272 auto bytes_written = write(args->d_out_file, args->d_buf, args->d_num);
273 if (bytes_written != args->d_num)
276 args->d_out.write(args->d_buf + 4, args->d_num);
277 if (args->d_out.fail()) {
279 oss <<
"Could not write data: " << __FILE__ <<
":" << __LINE__;
280 args->d_error = oss.str();
285 delete[] args->d_buf;
#define internal_error
Internal server error (500)
A class for error processing.
A class for software fault reporting.
static void * write_thread(void *arg)
void start_thread(void *(*thread)(void *arg), std::ostream &out, char *byte_buf, std::streamsize bytes_written)
virtual ~MarshallerThread()
static void * write_thread_part(void *arg)
top level DAP object to house generic methods