libdap  Updated for version 3.20.6
libdap4 is an implementation of OPeNDAP's DAP protocol.
XDRStreamMarshaller.cc
1 // XDRStreamMarshaller.cc
2 
3 // -*- mode: c++; c-basic-offset:4 -*-
4 
5 // This file is part of libdap, A C++ implementation of the OPeNDAP Data
6 // Access Protocol.
7 
8 // Copyright (c) 2002,2003,2016 OPeNDAP, Inc.
9 // Author: Patrick West <pwest@ucar.edu>
10 // James Gallagher <jgallagher@opendap.org>
11 //
12 // This library is free software; you can redistribute it and/or
13 // modify it under the terms of the GNU Lesser General Public
14 // License as published by the Free Software Foundation; either
15 // version 2.1 of the License, or (at your option) any later version.
16 //
17 // This library is distributed in the hope that it will be useful,
18 // but WITHOUT ANY WARRANTY; without even the implied warranty of
19 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
20 // Lesser General Public License for more details.
21 //
22 // You should have received a copy of the GNU Lesser General Public
23 // License along with this library; if not, write to the Free Software
24 // Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
25 //
26 // You can contact OPeNDAP, Inc. at PO Box 112, Saunderstown, RI. 02874-0112.
27 
28 // (c) COPYRIGHT URI/MIT 1994-1999
29 // Please read the full copyright statement in the file COPYRIGHT_URI.
30 //
31 // Authors:
32 // pwest Patrick West <pwest@ucar.edu>
33 
34 
35 #include "config.h"
36 
37 #ifdef HAVE_PTHREAD_H
38 #include <pthread.h>
39 #endif
40 
41 #include <cassert>
42 
43 #include <iostream>
44 #include <sstream>
45 #include <iomanip>
46 
47 // #define DODS_DEBUG
48 
49 #include "XDRStreamMarshaller.h"
50 #ifdef USE_POSIX_THREADS
51 #include "MarshallerThread.h"
52 #endif
53 #include "Vector.h"
54 #include "XDRUtils.h"
55 #include "util.h"
56 
57 #include "debug.h"
58 #include "DapIndent.h"
59 
60 using namespace std;
61 
62 // Build this code so it does not use pthreads to write some kinds of
63 // data (see the put_vector() and put_vector_part() methods) in a child thread.
64 // #undef USE_POSIX_THREADS
65 
66 namespace libdap {
67 
68 char *XDRStreamMarshaller::d_buf = 0;
69 static const int XDR_DAP_BUFF_SIZE=256;
70 
71 
80 XDRStreamMarshaller::XDRStreamMarshaller(ostream &out) :
81  d_out(out), d_partial_put_byte_count(0), tm(0)
82 {
83  if (!d_buf) d_buf = (char *) malloc(XDR_DAP_BUFF_SIZE);
84  if (!d_buf) throw Error(internal_error, "Failed to allocate memory for data serialization.");
85 
86  xdrmem_create(&d_sink, d_buf, XDR_DAP_BUFF_SIZE, XDR_ENCODE);
87 
88 #ifdef USE_POSIX_THREADS
89  tm = new MarshallerThread;
90 #endif
91 }
92 
93 XDRStreamMarshaller::~XDRStreamMarshaller()
94 {
95  // Added this because when USE_POS... is not defined, 'tm' has no
96  // type, which the compiler complains about.
97 #ifdef USE_POSIX_THREADS
98  delete tm;
99 #endif
100  xdr_destroy(&d_sink);
101 }
102 
103 void XDRStreamMarshaller::put_byte(dods_byte val)
104 {
105  if (!xdr_setpos(&d_sink, 0))
106  throw Error("Network I/O Error. Could not send byte data - unable to set stream position.");
107 
108  if (!xdr_char(&d_sink, (char *) &val))
109  throw Error(
110  "Network I/O Error. Could not send byte data.");
111 
112  unsigned int bytes_written = xdr_getpos(&d_sink);
113  if (!bytes_written)
114  throw Error(
115  "Network I/O Error. Could not send byte data - unable to get stream position.");
116 
117 #ifdef USE_POSIX_THREADS
118  Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
119 #endif
120 
121  d_out.write(d_buf, bytes_written);
122 }
123 
124 void XDRStreamMarshaller::put_int16(dods_int16 val)
125 {
126  if (!xdr_setpos(&d_sink, 0))
127  throw Error(
128  "Network I/O Error. Could not send int 16 data - unable to set stream position.");
129 
130  if (!XDR_INT16(&d_sink, &val))
131  throw Error(
132  "Network I/O Error. Could not send int 16 data.");
133 
134  unsigned int bytes_written = xdr_getpos(&d_sink);
135  if (!bytes_written)
136  throw Error(
137  "Network I/O Error. Could not send int 16 data - unable to get stream position.");
138 
139 #ifdef USE_POSIX_THREADS
140  Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
141 #endif
142 
143  d_out.write(d_buf, bytes_written);
144 }
145 
146 void XDRStreamMarshaller::put_int32(dods_int32 val)
147 {
148  if (!xdr_setpos(&d_sink, 0))
149  throw Error(
150  "Network I/O Error. Could not send int 32 data - unable to set stream position.");
151 
152  if (!XDR_INT32(&d_sink, &val))
153  throw Error(
154  "Network I/O Error. Culd not read int 32 data.");
155 
156  unsigned int bytes_written = xdr_getpos(&d_sink);
157  if (!bytes_written)
158  throw Error(
159  "Network I/O Error. Could not send int 32 data - unable to get stream position.");
160 
161 #ifdef USE_POSIX_THREADS
162  Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
163 #endif
164 
165  d_out.write(d_buf, bytes_written);
166 }
167 
168 void XDRStreamMarshaller::put_float32(dods_float32 val)
169 {
170  if (!xdr_setpos(&d_sink, 0))
171  throw Error(
172  "Network I/O Error. Could not send float 32 data - unable to set stream position.");
173 
174  if (!xdr_float(&d_sink, &val))
175  throw Error(
176  "Network I/O Error. Could not send float 32 data.");
177 
178  unsigned int bytes_written = xdr_getpos(&d_sink);
179  if (!bytes_written)
180  throw Error(
181  "Network I/O Error. Could not send float 32 data - unable to get stream position.");
182 
183 #ifdef USE_POSIX_THREADS
184  Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
185 #endif
186 
187  d_out.write(d_buf, bytes_written);
188 }
189 
190 void XDRStreamMarshaller::put_float64(dods_float64 val)
191 {
192  if (!xdr_setpos(&d_sink, 0))
193  throw Error(
194  "Network I/O Error. Could not send float 64 data - unable to set stream position.");
195 
196  if (!xdr_double(&d_sink, &val))
197  throw Error(
198  "Network I/O Error. Could not send float 64 data.");
199 
200  unsigned int bytes_written = xdr_getpos(&d_sink);
201  if (!bytes_written)
202  throw Error(
203  "Network I/O Error. Could not send float 64 data - unable to get stream position.");
204 
205 #ifdef USE_POSIX_THREADS
206  Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
207 #endif
208 
209  d_out.write(d_buf, bytes_written);
210 }
211 
212 void XDRStreamMarshaller::put_uint16(dods_uint16 val)
213 {
214  if (!xdr_setpos(&d_sink, 0))
215  throw Error(
216  "Network I/O Error. Could not send uint 16 data - unable to set stream position.");
217 
218  if (!XDR_UINT16(&d_sink, &val))
219  throw Error(
220  "Network I/O Error. Could not send uint 16 data.");
221 
222  unsigned int bytes_written = xdr_getpos(&d_sink);
223  if (!bytes_written)
224  throw Error(
225  "Network I/O Error. Could not send uint 16 data - unable to get stream position.");
226 
227 #ifdef USE_POSIX_THREADS
228  Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
229 #endif
230 
231  d_out.write(d_buf, bytes_written);
232 }
233 
234 void XDRStreamMarshaller::put_uint32(dods_uint32 val)
235 {
236  if (!xdr_setpos(&d_sink, 0))
237  throw Error(
238  "Network I/O Error. Could not send uint 32 data - unable to set stream position.");
239 
240  if (!XDR_UINT32(&d_sink, &val))
241  throw Error(
242  "Network I/O Error. Could not send uint 32 data.");
243 
244  unsigned int bytes_written = xdr_getpos(&d_sink);
245  if (!bytes_written)
246  throw Error(
247  "Network I/O Error. Could not send uint 32 data - unable to get stream position.");
248 
249 #ifdef USE_POSIX_THREADS
250  Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
251 #endif
252 
253  d_out.write(d_buf, bytes_written);
254 }
255 
256 void XDRStreamMarshaller::put_str(const string &val)
257 {
258  int size = val.length() + 8;
259 
260  XDR str_sink;
261  vector<char> str_buf(size);
262 
263  try {
264  xdrmem_create(&str_sink, &str_buf[0], size, XDR_ENCODE);
265 
266  if (!xdr_setpos(&str_sink, 0))
267  throw Error(
268  "Network I/O Error. Could not send string data - unable to set stream position.");
269 
270  const char *out_tmp = val.c_str();
271  if (!xdr_string(&str_sink, (char **) &out_tmp, size))
272  throw Error(
273  "Network I/O Error. Could not send string data.");
274 
275  unsigned int bytes_written = xdr_getpos(&str_sink);
276  if (!bytes_written)
277  throw Error(
278  "Network I/O Error. Could not send string data - unable to get stream position.");
279 
280 #ifdef USE_POSIX_THREADS
281  Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
282 #endif
283 
284  d_out.write(&str_buf[0], bytes_written);
285 
286  xdr_destroy(&str_sink);
287  }
288  catch (...) {
289  xdr_destroy(&str_sink);
290  throw;
291  }
292 }
293 
294 void XDRStreamMarshaller::put_url(const string &val)
295 {
296  put_str(val);
297 }
298 
299 void XDRStreamMarshaller::put_opaque(char *val, unsigned int len)
300 {
301  if (len > XDR_DAP_BUFF_SIZE)
302  throw Error("Network I/O Error. Could not send opaque data - length of opaque data larger than allowed");
303 
304  if (!xdr_setpos(&d_sink, 0))
305  throw Error(
306  "Network I/O Error. Could not send opaque data - unable to set stream position.");
307 
308  if (!xdr_opaque(&d_sink, val, len))
309  throw Error(
310  "Network I/O Error. Could not send opaque data.");
311 
312  unsigned int bytes_written = xdr_getpos(&d_sink);
313  if (!bytes_written)
314  throw Error(
315  "Network I/O Error. Could not send opaque data - unable to get stream position.");
316 
317 #ifdef USE_POSIX_THREADS
318  Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
319 #endif
320 
321  d_out.write(d_buf, bytes_written);
322 }
323 
324 void XDRStreamMarshaller::put_int(int val)
325 {
326  if (!xdr_setpos(&d_sink, 0))
327  throw Error(
328  "Network I/O Error. Could not send int data - unable to set stream position.");
329 
330  if (!xdr_int(&d_sink, &val))
331  throw Error(
332  "Network I/O Error(1). Could not send int data.");
333 
334  unsigned int bytes_written = xdr_getpos(&d_sink);
335  if (!bytes_written)
336  throw Error(
337  "Network I/O Error. Could not send int data - unable to get stream position.");
338 
339 #ifdef USE_POSIX_THREADS
340  Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
341 #endif
342 
343  d_out.write(d_buf, bytes_written);
344 }
345 
346 void XDRStreamMarshaller::put_vector(char *val, int num, int width, Vector &vec)
347 {
348  put_vector(val, num, width, vec.var()->type());
349 }
350 
351 
360 {
361  put_int(num);
362  put_int(num);
363 
364  d_partial_put_byte_count = 0;
365 }
366 
374 {
375 #ifdef USE_POSIX_THREADS
376  Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
377 #endif
378 
379  // Compute the trailing (padding) bytes
380 
381  // Note that the XDR standard pads values to 4 byte boundaries.
382  //unsigned int pad = (d_partial_put_byte_count % 4) == 0 ? 0: 4 - (d_partial_put_byte_count % 4);
383  unsigned int mod_4 = d_partial_put_byte_count & 0x03;
384  unsigned int pad = (mod_4 == 0) ? 0: 4 - mod_4;
385 
386  if (pad) {
387  vector<char> padding(4, 0); // 4 zeros
388 
389  d_out.write(&padding[0], pad);
390  if (d_out.fail()) throw Error("Network I/O Error. Could not send vector data padding");
391  }
392 }
393 
394 // Start of parallel I/O support. jhrg 8/19/15
395 void XDRStreamMarshaller::put_vector(char *val, int num, Vector &)
396 {
397  if (!val) throw InternalErr(__FILE__, __LINE__, "Could not send byte vector data. Buffer pointer is not set.");
398 
399  // write the number of members of the array being written and then set the position to 0
400  put_int(num);
401 
402  // this is the word boundary for writing xdr bytes in a vector.
403  const unsigned int add_to = 8;
404  // switch to memory on the heap since the thread will need to access it
405  // after this code returns.
406  char *byte_buf = new char[num + add_to];
407  XDR byte_sink;
408  try {
409  xdrmem_create(&byte_sink, byte_buf, num + add_to, XDR_ENCODE);
410  if (!xdr_setpos(&byte_sink, 0))
411  throw Error("Network I/O Error. Could not send byte vector data - unable to set stream position.");
412 
413  if (!xdr_bytes(&byte_sink, (char **) &val, (unsigned int *) &num, num + add_to))
414  throw Error("Network I/O Error(2). Could not send byte vector data - unable to encode data.");
415 
416  unsigned int bytes_written = xdr_getpos(&byte_sink);
417  if (!bytes_written)
418  throw Error("Network I/O Error. Could not send byte vector data - unable to get stream position.");
419 
420 #ifdef USE_POSIX_THREADS
421  Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
422  tm->increment_child_thread_count();
423  tm->start_thread(MarshallerThread::write_thread, d_out, byte_buf, bytes_written);
424  xdr_destroy(&byte_sink);
425 #else
426  d_out.write(byte_buf, bytes_written);
427  xdr_destroy(&byte_sink);
428  delete [] byte_buf;
429 #endif
430 
431  }
432  catch (...) {
433  DBG(cerr << "Caught an exception in put_vector_thread" << endl);
434  xdr_destroy(&byte_sink);
435  delete [] byte_buf;
436  throw;
437  }
438 }
439 
440 // private
451 void XDRStreamMarshaller::put_vector(char *val, unsigned int num, int width, Type type)
452 {
453  assert(val || num == 0);
454 
455  // write the number of array members being written, then set the position back to 0
456  put_int(num);
457 
458  if (num == 0)
459  return;
460 
461  int use_width = width;
462  if (use_width < 4) use_width = 4;
463 
464  // the size is the number of elements num times the width of each
465  // element, then add 4 bytes for the number of elements
466  int size = (num * use_width) + 4;
467 
468  // allocate enough memory for the elements
469  //vector<char> vec_buf(size);
470  char *vec_buf = new char[size];
471  XDR vec_sink;
472  try {
473  xdrmem_create(&vec_sink, vec_buf, size, XDR_ENCODE);
474 
475  // set the position of the sink to 0, we're starting at the beginning
476  if (!xdr_setpos(&vec_sink, 0))
477  throw Error("Network I/O Error. Could not send vector data - unable to set stream position.");
478 
479  // write the array to the buffer
480  if (!xdr_array(&vec_sink, (char **) &val, (unsigned int *) &num, size, width, XDRUtils::xdr_coder(type)))
481  throw Error("Network I/O Error(2). Could not send vector data - unable to encode.");
482 
483  // how much was written to the buffer
484  unsigned int bytes_written = xdr_getpos(&vec_sink);
485  if (!bytes_written)
486  throw Error("Network I/O Error. Could not send vector data - unable to get stream position.");
487 
488 #ifdef USE_POSIX_THREADS
489  Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
490  tm->increment_child_thread_count();
491  tm->start_thread(MarshallerThread::write_thread, d_out, vec_buf, bytes_written);
492  xdr_destroy(&vec_sink);
493 #else
494  d_out.write(vec_buf, bytes_written);
495  xdr_destroy(&vec_sink);
496  delete [] vec_buf;
497 #endif
498  }
499  catch (...) {
500  xdr_destroy(&vec_sink);
501  delete [] vec_buf;
502  throw;
503  }
504 }
505 
517 void XDRStreamMarshaller::put_vector_part(char *val, unsigned int num, int width, Type type)
518 {
519  if (width == 1) {
520  // Add space for the 4 bytes of length info and 4 bytes for padding, even though
521  // we will not send either of those.
522  const unsigned int add_to = 8;
523  unsigned int bufsiz = num + add_to;
524  //vector<char> byte_buf(bufsiz);
525  char *byte_buf = new char[bufsiz];
526  XDR byte_sink;
527  try {
528  xdrmem_create(&byte_sink, byte_buf, bufsiz, XDR_ENCODE);
529  if (!xdr_setpos(&byte_sink, 0))
530  throw Error("Network I/O Error. Could not send byte vector data - unable to set stream position.");
531 
532  if (!xdr_bytes(&byte_sink, (char **) &val, (unsigned int *) &num, bufsiz))
533  throw Error("Network I/O Error(2). Could not send byte vector data - unable to encode data.");
534 
535 #ifdef USE_POSIX_THREADS
536  Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
537  tm->increment_child_thread_count();
538 
539  // Increment the element count so we can figure out about the padding in put_vector_last()
540  d_partial_put_byte_count += num;
541 
542  tm->start_thread(MarshallerThread::write_thread_part, d_out, byte_buf, num);
543  xdr_destroy(&byte_sink);
544 #else
545  // Only send the num bytes that follow the 4 bytes of length info - we skip the
546  // length info because it's already been sent and we don't send any trailing padding
547  // bytes in this method (see put_vector_last() for that).
548  d_out.write(byte_buf + 4, num);
549 
550  if (d_out.fail())
551  throw Error ("Network I/O Error. Could not send initial part of byte vector data");
552 
553  // Now increment the element count so we can figure out about the padding in put_vector_last()
554  d_partial_put_byte_count += num;
555 
556  xdr_destroy(&byte_sink);
557  delete [] byte_buf;
558 #endif
559  }
560  catch (...) {
561  xdr_destroy(&byte_sink);
562  delete [] byte_buf;
563  throw;
564  }
565  }
566  else {
567  int use_width = (width < 4) ? 4 : width;
568 
569  // the size is the number of elements num times the width of each
570  // element, then add 4 bytes for the (int) number of elements
571  int size = (num * use_width) + 4;
572 
573  // allocate enough memory for the elements
574  //vector<char> vec_buf(size);
575  char *vec_buf = new char[size];
576  XDR vec_sink;
577  try {
578  xdrmem_create(&vec_sink, vec_buf, size, XDR_ENCODE);
579 
580  // set the position of the sink to 0, we're starting at the beginning
581  if (!xdr_setpos(&vec_sink, 0))
582  throw Error("Network I/O Error. Could not send vector data - unable to set stream position.");
583 
584  // write the array to the buffer
585  if (!xdr_array(&vec_sink, (char **) &val, (unsigned int *) &num, size, width, XDRUtils::xdr_coder(type)))
586  throw Error("Network I/O Error(2). Could not send vector data -unable to encode data.");
587 
588 #ifdef USE_POSIX_THREADS
589  Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
590  tm->increment_child_thread_count();
591 
592  // Increment the element count so we can figure out about the padding in put_vector_last()
593  d_partial_put_byte_count += (size - 4);
594  tm->start_thread(MarshallerThread::write_thread_part, d_out, vec_buf, size - 4);
595  xdr_destroy(&vec_sink);
596 #else
597  // write that much out to the output stream, skipping the length data that
598  // XDR writes since we have already written the length info using put_vector_start()
599  d_out.write(vec_buf + 4, size - 4);
600 
601  if (d_out.fail())
602  throw Error ("Network I/O Error. Could not send part of vector data");
603 
604  // Now increment the element count so we can figure out about the padding in put_vector_last()
605  d_partial_put_byte_count += (size - 4);
606 
607  xdr_destroy(&vec_sink);
608  delete [] vec_buf;
609 #endif
610  }
611  catch (...) {
612  xdr_destroy(&vec_sink);
613  delete [] vec_buf;
614  throw;
615  }
616  }
617 }
618 
619 void XDRStreamMarshaller::dump(ostream &strm) const
620 {
621  strm << DapIndent::LMarg << "XDRStreamMarshaller::dump - (" << (void *) this << ")" << endl;
622 }
623 
624 } // namespace libdap
625 
static void * write_thread(void *arg)
Holds a one-dimensional collection of DAP2 data types.
Definition: Vector.h:80
virtual void put_vector_part(char *val, unsigned int num, int width, Type type)
void start_thread(void *(*thread)(void *arg), std::ostream &out, char *byte_buf, unsigned int bytes_written)
STL namespace.
virtual void put_vector_start(int num)
Type
Identifies the data type.
Definition: Type.h:94
top level DAP object to house generic methods
Definition: AISConnect.cc:30
A class for software fault reporting.
Definition: InternalErr.h:64
virtual void dump(ostream &strm) const
dump the contents of this object to the specified ostream
virtual BaseType * var(const string &name="", bool exact_match=true, btp_stack *s=0)
Definition: Vector.cc:433
virtual Type type() const
Returns the type of the class instance.
Definition: BaseType.cc:365
static void * write_thread_part(void *arg)
static xdrproc_t xdr_coder(const Type &t)
Returns a function used to encode elements of an array.
Definition: XDRUtils.cc:145
A class for error processing.
Definition: Error.h:92