libdap Updated for version 3.21.1
libdap4 is an implementation of OPeNDAP's DAP protocol.
XDRStreamMarshaller.cc
Go to the documentation of this file.
1// XDRStreamMarshaller.cc
2
3// -*- mode: c++; c-basic-offset:4 -*-
4
5// This file is part of libdap, A C++ implementation of the OPeNDAP Data
6// Access Protocol.
7
8// Copyright (c) 2002,2003,2016 OPeNDAP, Inc.
9// Author: Patrick West <pwest@ucar.edu>
10// James Gallagher <jgallagher@opendap.org>
11//
12// This library is free software; you can redistribute it and/or
13// modify it under the terms of the GNU Lesser General Public
14// License as published by the Free Software Foundation; either
15// version 2.1 of the License, or (at your option) any later version.
16//
17// This library is distributed in the hope that it will be useful,
18// but WITHOUT ANY WARRANTY; without even the implied warranty of
19// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
20// Lesser General Public License for more details.
21//
22// You should have received a copy of the GNU Lesser General Public
23// License along with this library; if not, write to the Free Software
24// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
25//
26// You can contact OPeNDAP, Inc. at PO Box 112, Saunderstown, RI. 02874-0112.
27
28// (c) COPYRIGHT URI/MIT 1994-1999
29// Please read the full copyright statement in the file COPYRIGHT_URI.
30//
31// Authors:
32// pwest Patrick West <pwest@ucar.edu>
33
34#include "config.h"
35
36#ifdef HAVE_PTHREAD_H
37#include <pthread.h>
38#endif
39
40#include <cassert>
41
42#include <iomanip>
43#include <iostream>
44#include <sstream>
45
46// #define DODS_DEBUG
47
48#include "XDRStreamMarshaller.h"
49#ifdef USE_POSIX_THREADS
50#include "MarshallerThread.h"
51#endif
52#include "Vector.h"
53#include "XDRUtils.h"
54#include "util.h"
55
56#include "DapIndent.h"
57#include "debug.h"
58
59using namespace std;
60
61// Build this code so it does not use pthreads to write some kinds of
62// data (see the put_vector() and put_vector_part() methods) in a child thread.
63// #undef USE_POSIX_THREADS
64
65namespace libdap {
66
67char *XDRStreamMarshaller::d_buf = 0;
68static const int XDR_DAP_BUFF_SIZE = 256;
69
78XDRStreamMarshaller::XDRStreamMarshaller(ostream &out) : d_out(out), d_partial_put_byte_count(0), tm(0) {
79 if (!d_buf)
80 d_buf = (char *)malloc(XDR_DAP_BUFF_SIZE);
81 if (!d_buf)
82 throw Error(internal_error, "Failed to allocate memory for data serialization.");
83
84 xdrmem_create(&d_sink, d_buf, XDR_DAP_BUFF_SIZE, XDR_ENCODE);
85
86#ifdef USE_POSIX_THREADS
87 tm = new MarshallerThread;
88#endif
89}
90
92 // Added this because when USE_POS... is not defined, 'tm' has no
93 // type, which the compiler complains about.
94#ifdef USE_POSIX_THREADS
95 delete tm;
96#endif
97 xdr_destroy(&d_sink);
98}
99
101 if (!xdr_setpos(&d_sink, 0))
102 throw Error("Network I/O Error. Could not send byte data - unable to set stream position.");
103
104 if (!xdr_char(&d_sink, (char *)&val))
105 throw Error("Network I/O Error. Could not send byte data.");
106
107 unsigned int bytes_written = xdr_getpos(&d_sink);
108 if (!bytes_written)
109 throw Error("Network I/O Error. Could not send byte data - unable to get stream position.");
110
111#ifdef USE_POSIX_THREADS
112 Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
113#endif
114
115 d_out.write(d_buf, bytes_written);
116}
117
119 if (!xdr_setpos(&d_sink, 0))
120 throw Error("Network I/O Error. Could not send int 16 data - unable to set stream position.");
121
122 if (!XDR_INT16(&d_sink, &val))
123 throw Error("Network I/O Error. Could not send int 16 data.");
124
125 unsigned int bytes_written = xdr_getpos(&d_sink);
126 if (!bytes_written)
127 throw Error("Network I/O Error. Could not send int 16 data - unable to get stream position.");
128
129#ifdef USE_POSIX_THREADS
130 Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
131#endif
132
133 d_out.write(d_buf, bytes_written);
134}
135
137 if (!xdr_setpos(&d_sink, 0))
138 throw Error("Network I/O Error. Could not send int 32 data - unable to set stream position.");
139
140 if (!XDR_INT32(&d_sink, &val))
141 throw Error("Network I/O Error. Culd not read int 32 data.");
142
143 unsigned int bytes_written = xdr_getpos(&d_sink);
144 if (!bytes_written)
145 throw Error("Network I/O Error. Could not send int 32 data - unable to get stream position.");
146
147#ifdef USE_POSIX_THREADS
148 Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
149#endif
150
151 d_out.write(d_buf, bytes_written);
152}
153
155 if (!xdr_setpos(&d_sink, 0))
156 throw Error("Network I/O Error. Could not send float 32 data - unable to set stream position.");
157
158 if (!xdr_float(&d_sink, &val))
159 throw Error("Network I/O Error. Could not send float 32 data.");
160
161 unsigned int bytes_written = xdr_getpos(&d_sink);
162 if (!bytes_written)
163 throw Error("Network I/O Error. Could not send float 32 data - unable to get stream position.");
164
165#ifdef USE_POSIX_THREADS
166 Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
167#endif
168
169 d_out.write(d_buf, bytes_written);
170}
171
173 if (!xdr_setpos(&d_sink, 0))
174 throw Error("Network I/O Error. Could not send float 64 data - unable to set stream position.");
175
176 if (!xdr_double(&d_sink, &val))
177 throw Error("Network I/O Error. Could not send float 64 data.");
178
179 unsigned int bytes_written = xdr_getpos(&d_sink);
180 if (!bytes_written)
181 throw Error("Network I/O Error. Could not send float 64 data - unable to get stream position.");
182
183#ifdef USE_POSIX_THREADS
184 Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
185#endif
186
187 d_out.write(d_buf, bytes_written);
188}
189
191 if (!xdr_setpos(&d_sink, 0))
192 throw Error("Network I/O Error. Could not send uint 16 data - unable to set stream position.");
193
194 if (!XDR_UINT16(&d_sink, &val))
195 throw Error("Network I/O Error. Could not send uint 16 data.");
196
197 unsigned int bytes_written = xdr_getpos(&d_sink);
198 if (!bytes_written)
199 throw Error("Network I/O Error. Could not send uint 16 data - unable to get stream position.");
200
201#ifdef USE_POSIX_THREADS
202 Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
203#endif
204
205 d_out.write(d_buf, bytes_written);
206}
207
209 if (!xdr_setpos(&d_sink, 0))
210 throw Error("Network I/O Error. Could not send uint 32 data - unable to set stream position.");
211
212 if (!XDR_UINT32(&d_sink, &val))
213 throw Error("Network I/O Error. Could not send uint 32 data.");
214
215 unsigned int bytes_written = xdr_getpos(&d_sink);
216 if (!bytes_written)
217 throw Error("Network I/O Error. Could not send uint 32 data - unable to get stream position.");
218
219#ifdef USE_POSIX_THREADS
220 Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
221#endif
222
223 d_out.write(d_buf, bytes_written);
224}
225
226void XDRStreamMarshaller::put_str(const string &val) {
227 int size = val.length() + 8;
228
229 XDR str_sink;
230 vector<char> str_buf(size);
231
232 try {
233 xdrmem_create(&str_sink, str_buf.data(), size, XDR_ENCODE);
234
235 if (!xdr_setpos(&str_sink, 0))
236 throw Error("Network I/O Error. Could not send string data - unable to set stream position.");
237
238 const char *out_tmp = val.c_str();
239 if (!xdr_string(&str_sink, (char **)&out_tmp, size))
240 throw Error("Network I/O Error. Could not send string data.");
241
242 unsigned int bytes_written = xdr_getpos(&str_sink);
243 if (!bytes_written)
244 throw Error("Network I/O Error. Could not send string data - unable to get stream position.");
245
246#ifdef USE_POSIX_THREADS
247 Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
248#endif
249
250 d_out.write(str_buf.data(), bytes_written);
251
252 xdr_destroy(&str_sink);
253 } catch (...) {
254 xdr_destroy(&str_sink);
255 throw;
256 }
257}
258
259void XDRStreamMarshaller::put_url(const string &val) { put_str(val); }
260
261void XDRStreamMarshaller::put_opaque(char *val, unsigned int len) {
262 if (len > XDR_DAP_BUFF_SIZE)
263 throw Error("Network I/O Error. Could not send opaque data - length of opaque data larger than allowed");
264
265 if (!xdr_setpos(&d_sink, 0))
266 throw Error("Network I/O Error. Could not send opaque data - unable to set stream position.");
267
268 if (!xdr_opaque(&d_sink, val, len))
269 throw Error("Network I/O Error. Could not send opaque data.");
270
271 unsigned int bytes_written = xdr_getpos(&d_sink);
272 if (!bytes_written)
273 throw Error("Network I/O Error. Could not send opaque data - unable to get stream position.");
274
275#ifdef USE_POSIX_THREADS
276 Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
277#endif
278
279 d_out.write(d_buf, bytes_written);
280}
281
283 if (!xdr_setpos(&d_sink, 0))
284 throw Error("Network I/O Error. Could not send int data - unable to set stream position.");
285
286 if (!xdr_int(&d_sink, &val))
287 throw Error("Network I/O Error(1). Could not send int data.");
288
289 unsigned int bytes_written = xdr_getpos(&d_sink);
290 if (!bytes_written)
291 throw Error("Network I/O Error. Could not send int data - unable to get stream position.");
292
293#ifdef USE_POSIX_THREADS
294 Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
295#endif
296
297 d_out.write(d_buf, bytes_written);
298}
299
300void XDRStreamMarshaller::put_vector(char *val, int num, int width, Vector &vec) {
301 put_vector(val, num, width, vec.var()->type());
302}
303
312 put_int(num);
313 put_int(num);
314
315 d_partial_put_byte_count = 0;
316}
317
325#ifdef USE_POSIX_THREADS
326 Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
327#endif
328
329 // Compute the trailing (padding) bytes
330
331 // Note that the XDR standard pads values to 4 byte boundaries.
332 // unsigned int pad = (d_partial_put_byte_count % 4) == 0 ? 0: 4 - (d_partial_put_byte_count % 4);
333 unsigned int mod_4 = d_partial_put_byte_count & 0x03;
334 unsigned int pad = (mod_4 == 0) ? 0 : 4 - mod_4;
335
336 if (pad) {
337 vector<char> padding(4, 0); // 4 zeros
338
339 d_out.write(padding.data(), pad);
340 if (d_out.fail())
341 throw Error("Network I/O Error. Could not send vector data padding");
342 }
343}
344
345// Start of parallel I/O support. jhrg 8/19/15
346void XDRStreamMarshaller::put_vector(char *val, int num, Vector &) {
347 if (!val)
348 throw InternalErr(__FILE__, __LINE__, "Could not send byte vector data. Buffer pointer is not set.");
349
350 // write the number of members of the array being written and then set the position to 0
351 put_int(num);
352
353 // this is the word boundary for writing xdr bytes in a vector.
354 const unsigned int add_to = 8;
355 // switch to memory on the heap since the thread will need to access it
356 // after this code returns.
357 char *byte_buf = new char[num + add_to];
358 XDR byte_sink;
359 try {
360 xdrmem_create(&byte_sink, byte_buf, num + add_to, XDR_ENCODE);
361 if (!xdr_setpos(&byte_sink, 0))
362 throw Error("Network I/O Error. Could not send byte vector data - unable to set stream position.");
363
364 if (!xdr_bytes(&byte_sink, (char **)&val, (unsigned int *)&num, num + add_to))
365 throw Error("Network I/O Error(2). Could not send byte vector data - unable to encode data.");
366
367 unsigned int bytes_written = xdr_getpos(&byte_sink);
368 if (!bytes_written)
369 throw Error("Network I/O Error. Could not send byte vector data - unable to get stream position.");
370
371#ifdef USE_POSIX_THREADS
372 Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
373 tm->increment_child_thread_count();
374 tm->start_thread(MarshallerThread::write_thread, d_out, byte_buf, bytes_written);
375 xdr_destroy(&byte_sink);
376#else
377 d_out.write(byte_buf, bytes_written);
378 xdr_destroy(&byte_sink);
379 delete[] byte_buf;
380#endif
381
382 } catch (...) {
383 DBG(cerr << "Caught an exception in put_vector_thread" << endl);
384 xdr_destroy(&byte_sink);
385 delete[] byte_buf;
386 throw;
387 }
388}
389
390// private
401void XDRStreamMarshaller::put_vector(char *val, unsigned int num, int width, Type type) {
402 assert(val || num == 0);
403
404 // write the number of array members being written, then set the position back to 0
405 put_int(num);
406
407 if (num == 0)
408 return;
409
410 int use_width = width;
411 if (use_width < 4)
412 use_width = 4;
413
414 // the size is the number of elements num times the width of each
415 // element, then add 4 bytes for the number of elements
416 int size = (num * use_width) + 4;
417
418 // allocate enough memory for the elements
419 // vector<char> vec_buf(size);
420 char *vec_buf = new char[size];
421 XDR vec_sink;
422 try {
423 xdrmem_create(&vec_sink, vec_buf, size, XDR_ENCODE);
424
425 // set the position of the sink to 0, we're starting at the beginning
426 if (!xdr_setpos(&vec_sink, 0))
427 throw Error("Network I/O Error. Could not send vector data - unable to set stream position.");
428
429 // write the array to the buffer
430 if (!xdr_array(&vec_sink, (char **)&val, (unsigned int *)&num, size, width, XDRUtils::xdr_coder(type)))
431 throw Error("Network I/O Error(2). Could not send vector data - unable to encode.");
432
433 // how much was written to the buffer
434 unsigned int bytes_written = xdr_getpos(&vec_sink);
435 if (!bytes_written)
436 throw Error("Network I/O Error. Could not send vector data - unable to get stream position.");
437
438#ifdef USE_POSIX_THREADS
439 Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
441 tm->start_thread(MarshallerThread::write_thread, d_out, vec_buf, bytes_written);
442 xdr_destroy(&vec_sink);
443#else
444 d_out.write(vec_buf, bytes_written);
445 xdr_destroy(&vec_sink);
446 delete[] vec_buf;
447#endif
448 } catch (...) {
449 xdr_destroy(&vec_sink);
450 delete[] vec_buf;
451 throw;
452 }
453}
454
466void XDRStreamMarshaller::put_vector_part(char *val, unsigned int num, int width, Type type) {
467 if (width == 1) {
468 // Add space for the 4 bytes of length info and 4 bytes for padding, even though
469 // we will not send either of those.
470 const unsigned int add_to = 8;
471 unsigned int bufsiz = num + add_to;
472 // vector<char> byte_buf(bufsiz);
473 char *byte_buf = new char[bufsiz];
474 XDR byte_sink;
475 try {
476 xdrmem_create(&byte_sink, byte_buf, bufsiz, XDR_ENCODE);
477 if (!xdr_setpos(&byte_sink, 0))
478 throw Error("Network I/O Error. Could not send byte vector data - unable to set stream position.");
479
480 if (!xdr_bytes(&byte_sink, (char **)&val, (unsigned int *)&num, bufsiz))
481 throw Error("Network I/O Error(2). Could not send byte vector data - unable to encode data.");
482
483#ifdef USE_POSIX_THREADS
484 Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
485 tm->increment_child_thread_count();
486
487 // Increment the element count so we can figure out about the padding in put_vector_last()
488 d_partial_put_byte_count += num;
489
490 tm->start_thread(MarshallerThread::write_thread_part, d_out, byte_buf, num);
491 xdr_destroy(&byte_sink);
492#else
493 // Only send the num bytes that follow the 4 bytes of length info - we skip the
494 // length info because it's already been sent and we don't send any trailing padding
495 // bytes in this method (see put_vector_last() for that).
496 d_out.write(byte_buf + 4, num);
497
498 if (d_out.fail())
499 throw Error("Network I/O Error. Could not send initial part of byte vector data");
500
501 // Now increment the element count so we can figure out about the padding in put_vector_last()
502 d_partial_put_byte_count += num;
503
504 xdr_destroy(&byte_sink);
505 delete[] byte_buf;
506#endif
507 } catch (...) {
508 xdr_destroy(&byte_sink);
509 delete[] byte_buf;
510 throw;
511 }
512 } else {
513 int use_width = (width < 4) ? 4 : width;
514
515 // the size is the number of elements num times the width of each
516 // element, then add 4 bytes for the (int) number of elements
517 int size = (num * use_width) + 4;
518
519 // allocate enough memory for the elements
520 // vector<char> vec_buf(size);
521 char *vec_buf = new char[size];
522 XDR vec_sink;
523 try {
524 xdrmem_create(&vec_sink, vec_buf, size, XDR_ENCODE);
525
526 // set the position of the sink to 0, we're starting at the beginning
527 if (!xdr_setpos(&vec_sink, 0))
528 throw Error("Network I/O Error. Could not send vector data - unable to set stream position.");
529
530 // write the array to the buffer
531 if (!xdr_array(&vec_sink, (char **)&val, (unsigned int *)&num, size, width, XDRUtils::xdr_coder(type)))
532 throw Error("Network I/O Error(2). Could not send vector data -unable to encode data.");
533
534#ifdef USE_POSIX_THREADS
535 Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
536 tm->increment_child_thread_count();
537
538 // Increment the element count so we can figure out about the padding in put_vector_last()
539 d_partial_put_byte_count += (size - 4);
540 tm->start_thread(MarshallerThread::write_thread_part, d_out, vec_buf, size - 4);
541 xdr_destroy(&vec_sink);
542#else
543 // write that much out to the output stream, skipping the length data that
544 // XDR writes since we have already written the length info using put_vector_start()
545 d_out.write(vec_buf + 4, size - 4);
546
547 if (d_out.fail())
548 throw Error("Network I/O Error. Could not send part of vector data");
549
550 // Now increment the element count so we can figure out about the padding in put_vector_last()
551 d_partial_put_byte_count += (size - 4);
552
553 xdr_destroy(&vec_sink);
554 delete[] vec_buf;
555#endif
556 } catch (...) {
557 xdr_destroy(&vec_sink);
558 delete[] vec_buf;
559 throw;
560 }
561 }
562}
563
564void XDRStreamMarshaller::dump(ostream &strm) const {
565 strm << DapIndent::LMarg << "XDRStreamMarshaller::dump - (" << (void *)this << ")" << endl;
566}
567
568} // namespace libdap
#define internal_error
Internal server error (500)
Definition Error.h:63
virtual Type type() const
Returns the type of the class instance.
Definition BaseType.cc:329
static ostream & LMarg(ostream &strm)
Definition DapIndent.cc:61
A class for error processing.
Definition Error.h:92
A class for software fault reporting.
Definition InternalErr.h:61
static void * write_thread(void *arg)
void start_thread(void *(*thread)(void *arg), std::ostream &out, char *byte_buf, std::streamsize bytes_written)
pthread_mutex_t & get_mutex()
static void * write_thread_part(void *arg)
pthread_cond_t & get_cond()
Holds a one-dimensional collection of DAP2 data types.
Definition Vector.h:81
BaseType * var(const string &name="", bool exact_match=true, btp_stack *s=nullptr) override
Definition Vector.cc:469
virtual void dump(ostream &strm) const
dump the contents of this object to the specified ostream
virtual void put_float64(dods_float64 val)
virtual void put_str(const string &val)
virtual void put_int16(dods_int16 val)
virtual void put_url(const string &val)
virtual void put_float32(dods_float32 val)
virtual void put_uint16(dods_uint16 val)
virtual void put_opaque(char *val, unsigned int len)
virtual void put_int32(dods_int32 val)
virtual void put_vector_start(int num)
virtual void put_uint32(dods_uint32 val)
virtual void put_byte(dods_byte val)
virtual void put_vector_part(char *val, unsigned int num, int width, Type type)
static xdrproc_t xdr_coder(const Type &t)
Returns a function used to encode elements of an array.
Definition XDRUtils.cc:137
#define XDR_UINT16
Definition config.h:1128
#define XDR_INT16
Definition config.h:1122
#define XDR_INT32
Definition config.h:1125
#define XDR_UINT32
Definition config.h:1131
#define DBG(x)
Definition debug.h:58
top level DAP object to house generic methods
Definition AISConnect.cc:30
Type
Identifies the data type.
Definition Type.h:94
uint32_t dods_uint32
uint16_t dods_uint16