bes Updated for version 3.21.1
The Backend Server (BES) is the lower two tiers of the Hyrax data server
Chunk.cc
1// -*- mode: c++; c-basic-offset:4 -*-
2
3// This file is part of the BES
4
5// Copyright (c) 2016 OPeNDAP, Inc.
6// Author: Nathan Potter <ndp@opendap.org>
7//
8// This library is free software; you can redistribute it and/or
9// modify it under the terms of the GNU Lesser General Public
10// License as published by the Free Software Foundation; either
11// version 2.1 of the License, or (at your option) any later version.
12//
13// This library is distributed in the hope that it will be useful,
14// but WITHOUT ANY WARRANTY; without even the implied warranty of
15// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16// Lesser General Public License for more details.
17//
18// You should have received a copy of the GNU Lesser General Public
19// License along with this library; if not, write to the Free Software
20// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
21//
22// You can contact OPeNDAP, Inc. at PO Box 112, Saunderstown, RI. 02874-0112.
23
24#include "config.h"
25
26#include <sstream>
27#include <cstring>
28
29#include <zlib.h>
30
31#include <BESDebug.h>
32#include <BESLog.h>
33#include <BESInternalError.h>
34#include <BESSyntaxUserError.h>
35#include <BESForbiddenError.h>
36#include <BESContextManager.h>
37#include <BESUtil.h>
38
39#define PUGIXML_NO_XPATH
40#define PUGIXML_HEADER_ONLY
41#include <pugixml.hpp>
42
43#include "Chunk.h"
44#include "CurlUtils.h"
45#include "CurlHandlePool.h"
46#include "EffectiveUrlCache.h"
47#include "DmrppRequestHandler.h"
48#include "DmrppNames.h"
49#include "byteswap_compat.h"
50#include "float_byteswap.h"
51
52using namespace std;
54
55#define prolog std::string("Chunk::").append(__func__).append("() - ")
56
57#define FLETCHER32_CHECKSUM 4 // Bytes in the fletcher32 checksum
58#define ACTUALLY_USE_FLETCHER32_CHECKSUM 1 // Computing checksums takes time...
59
60namespace dmrpp {
61
74size_t chunk_header_callback(char *buffer, size_t /*size*/, size_t nitems, void *data) {
75 // received header is nitems * size long in 'buffer' NOT ZERO TERMINATED
76 // 'userdata' is set with CURLOPT_HEADERDATA
77 // 'size' is always 1
78
79 // -2 strips of the CRLF at the end of the header
80 string header(buffer, buffer + nitems - 2);
81
82 // Look for the content type header and store its value in the Chunk
83 if (header.find("Content-Type") != string::npos) {
84 // Header format 'Content-Type: <value>'
85 auto c_ptr = reinterpret_cast<Chunk *>(data);
86 c_ptr->set_response_content_type(header.substr(header.find_last_of(' ') + 1));
87 }
88
89 return nitems;
90}
91
97void process_s3_error_response(const shared_ptr<http::url> &data_url, const string &xml_message)
98{
99 // See https://docs.aws.amazon.com/AmazonS3/latest/API/ErrorResponses.html
100 // for the low-down on this XML document.
101 pugi::xml_document error;
102 pugi::xml_parse_result result = error.load_string(xml_message.c_str());
103 if (!result)
104 throw BESInternalError("The underlying data store returned an unintelligible error message.", __FILE__, __LINE__);
105
106 pugi::xml_node err_elmnt = error.document_element();
107 if (!err_elmnt || (strcmp(err_elmnt.name(), "Error") != 0))
108 throw BESInternalError("The underlying data store returned a bogus error message.", __FILE__, __LINE__);
109
110 string code = err_elmnt.child_value("Code");
111 string message = err_elmnt.child_value("Message");
112
113 // We might want to get the "Code" from the "Error" if these text messages
114 // are not good enough. But the "Code" is not really suitable for normal humans...
115 // jhrg 12/31/19
116
117 if (code == "AccessDenied") {
118 stringstream msg;
119 msg << prolog << "ACCESS DENIED - The underlying object store has refused access to: "
120 << data_url->protocol() << data_url->host() << data_url->path() << " Object Store Message: "
121 << message;
122 BESDEBUG(MODULE, msg.str() << endl);
123 VERBOSE(msg.str());
124 throw BESForbiddenError(msg.str(), __FILE__, __LINE__);
125 }
126 else {
127 stringstream msg;
128 msg << prolog << "The underlying object store returned an error. " << "(Tried: " << data_url->protocol()
129 << "://" << data_url->host() << data_url->path() << ") Object Store Message: " << message;
130 BESDEBUG(MODULE, msg.str() << endl);
131 VERBOSE(msg.str());
132 throw BESInternalError(msg.str(), __FILE__, __LINE__);
133 }
134}
135
149size_t chunk_write_data(void *buffer, size_t size, size_t nmemb, void *data) {
150 BESDEBUG(MODULE, prolog << "BEGIN " << endl);
151 size_t nbytes = size * nmemb;
152 auto chunk = reinterpret_cast<Chunk *>(data);
153
154
155 auto data_url = chunk->get_data_url();
156 BESDEBUG(MODULE, prolog << "chunk->get_data_url():" << data_url << endl);
157
158 // When Content-Type is 'application/xml,' that's an error. jhrg 6/9/20
159 BESDEBUG(MODULE, prolog << "chunk->get_response_content_type():" << chunk->get_response_content_type() << endl);
160 if (chunk->get_response_content_type().find("application/xml") != string::npos) {
161 // At this point we no longer care about great performance - error msg readability
162 // is more important. jhrg 12/30/19
163 string xml_message = reinterpret_cast<const char *>(buffer);
164 xml_message.erase(xml_message.find_last_not_of("\t\n\v\f\r 0") + 1);
165 // Decode the AWS XML error message. In some cases this will fail because pub keys,
166 // which maybe in this error text, may have < or > chars in them. the XML parser
167 // will be sad if that happens. jhrg 12/30/19
168 try {
169 process_s3_error_response(data_url, xml_message); // throws a BESError
170 }
171 catch (BESError) {
172 // re-throw any BESError - added for the future if we make BESError a child
173 // of std::exception as it should be. jhrg 12/30/19
174 throw;
175 }
176 catch (std::exception &e) {
177 stringstream msg;
178 msg << prolog << "Caught std::exception when accessing object store data.";
179 msg << " (Tried: " << data_url->str() << ")" << " Message: " << e.what();
180 BESDEBUG(MODULE, msg.str() << endl);
181 throw BESSyntaxUserError(msg.str(), __FILE__, __LINE__);
182 }
183 }
184
185 // rbuf: |******++++++++++----------------------|
186 // ^ ^ bytes_read + nbytes
187 // | bytes_read
188
189 unsigned long long bytes_read = chunk->get_bytes_read();
190
191 // If this fails, the code will write beyond the buffer.
192 if (bytes_read + nbytes > chunk->get_rbuf_size()) {
193 stringstream msg;
194 msg << prolog << "ERROR! The number of bytes_read: " << bytes_read << " plus the number of bytes to read: "
195 << nbytes << " is larger than the target buffer size: " << chunk->get_rbuf_size();
196 BESDEBUG(MODULE, msg.str() << endl);
197 throw BESInternalError(msg.str(), __FILE__, __LINE__);
198 }
199
200 memcpy(chunk->get_rbuf() + bytes_read, buffer, nbytes);
201 chunk->set_bytes_read(bytes_read + nbytes);
202
203 BESDEBUG(MODULE, prolog << "END" << endl);
204
205 return nbytes;
206}
207
216static void inflate_sanity_check(char **destp, unsigned long long dest_len, const char *src, unsigned long long src_len) {
217 if (src_len == 0) {
218 string msg = prolog + "ERROR! The number of bytes to inflate is zero.";
219 BESDEBUG(MODULE, msg << endl);
220 throw BESInternalError(msg, __FILE__, __LINE__);
221 }
222 if (dest_len == 0) {
223 string msg = prolog + "ERROR! The number of bytes to inflate into is zero.";
224 BESDEBUG(MODULE, msg << endl);
225 throw BESInternalError(msg, __FILE__, __LINE__);
226 }
227 if (!destp || !*destp) {
228 string msg = prolog + "ERROR! The destination buffer is NULL.";
229 BESDEBUG(MODULE, msg << endl);
230 throw BESInternalError(msg, __FILE__, __LINE__);
231 }
232 if (!src) {
233 string msg = prolog + "ERROR! The source buffer is NULL.";
234 BESDEBUG(MODULE, msg << endl);
235 throw BESInternalError(msg, __FILE__, __LINE__);
236 }
237}
238
250unsigned long long inflate(char **destp, unsigned long long dest_len, char *src, unsigned long long src_len) {
251 inflate_sanity_check(destp, dest_len, src, src_len);
252
253 /* Input; uncompress */
254 z_stream z_strm; /* zlib parameters */
255
256 /* Set the decompression parameters */
257 memset(&z_strm, 0, sizeof(z_strm));
258 z_strm.next_in = (Bytef *) src;
259 z_strm.avail_in = src_len;
260 z_strm.next_out = (Bytef *) (*destp);
261 z_strm.avail_out = dest_len;
262
263 size_t nalloc = dest_len;
264
265 char *outbuf = *destp;
266
267 /* Initialize the decompression routines */
268 if (Z_OK != inflateInit(&z_strm))
269 throw BESError("Failed to initialize inflate software.", BES_INTERNAL_ERROR, __FILE__, __LINE__);
270
271 /* Loop to uncompress the buffer */
272 int status = Z_OK;
273 do {
274 /* Uncompress some data */
275 status = inflate(&z_strm, Z_SYNC_FLUSH);
276
277 /* Check if we are done decompressing data */
278 if (Z_STREAM_END == status) break; /*done*/
279
280 /* Check for error */
281 if (Z_OK != status) {
282 stringstream err_msg;
283 err_msg << "Failed to inflate data chunk.";
284 char const *err_msg_cstr = z_strm.msg;
285 if(err_msg_cstr)
286 err_msg << " zlib message: " << err_msg_cstr;
287 (void) inflateEnd(&z_strm);
288 throw BESError(err_msg.str(), BES_INTERNAL_ERROR, __FILE__, __LINE__);
289 }
290 else {
291 // If we're not done and just ran out of buffer space, we need to extend the buffer.
292 // We may encounter this case when the deflate filter is used twice. KY 2022-08-03
293 if (0 == z_strm.avail_out) {
294
295 /* Allocate a buffer twice as big */
296 size_t outbuf_size = nalloc;
297 nalloc *= 2;
298 char *new_outbuf = new char[nalloc];
299 memcpy((void*)new_outbuf,(void*)outbuf,outbuf_size);
300 delete[] outbuf;
301 outbuf = new_outbuf;
302
303 /* Update pointers to buffer for next set of uncompressed data */
304 z_strm.next_out = (unsigned char*) outbuf + z_strm.total_out;
305 z_strm.avail_out = (uInt) (nalloc - z_strm.total_out);
306
307 } /* end if */
308 } /* end else */
309 } while (true /* status == Z_OK */); // Exit via the break statement after the call to inflate(). jhrg 11/8/21
310
311 *destp = outbuf;
312 outbuf = nullptr;
313 /* Finish decompressing the stream */
314 (void) inflateEnd(&z_strm);
315
316 return z_strm.total_out;
317}
318
319// #define this to enable the duff's device loop unrolling code.
320// jhrg 1/19/17
321#define DUFFS_DEVICE
322
344void unshuffle(char *dest, const char *src, unsigned long long src_size, unsigned long long width) {
345 unsigned long long elems = src_size / width; // int division rounds down
346
347 /* Don't do anything for 1-byte elements, or "fractional" elements */
348 if (!(width > 1 && elems > 1)) {
349 memcpy(dest, const_cast<char *>(src), src_size);
350 }
351 else {
352 /* Get the pointer to the source buffer (Alias for source buffer) */
353 char *_src = const_cast<char *>(src);
354 char *_dest = 0; // Alias for destination buffer
355
356 /* Input; unshuffle */
357 for (unsigned int i = 0; i < width; i++) {
358 _dest = dest + i;
359#ifndef DUFFS_DEVICE
360 size_t j = elems;
361 while(j > 0) {
362 *_dest = *_src++;
363 _dest += width;
364
365 j--;
366 }
367#else /* DUFFS_DEVICE */
368 {
369 size_t duffs_index = (elems + 7) / 8; /* Counting index for Duff's device */
370 switch (elems % 8) {
371 default:
372 throw BESError("Internal error in unshuffle().", BES_INTERNAL_ERROR, __FILE__, __LINE__);
373
374 case 0:
375 do {
376 // This macro saves repeating the same line 8 times
377#define DUFF_GUTS *_dest = *_src++; _dest += width;
378
379 DUFF_GUTS
380 case 7:
381 DUFF_GUTS
382 case 6:
383 DUFF_GUTS
384 case 5:
385 DUFF_GUTS
386 case 4:
387 DUFF_GUTS
388 case 3:
389 DUFF_GUTS
390 case 2:
391 DUFF_GUTS
392 case 1:
393 DUFF_GUTS
394 } while (--duffs_index > 0);
395 } /* end switch */
396 } /* end block */
397#endif /* DUFFS_DEVICE */
398
399 } /* end for i = 0 to width*/
400
401 /* Compute the leftover bytes if there are any */
402 size_t leftover = src_size % width;
403
404 /* Add leftover to the end of data */
405 if (leftover > 0) {
406 /* Adjust back to end of shuffled bytes */
407 _dest -= (width - 1); /*lint !e794 _dest is initialized */
408 memcpy((void *) _dest, (void *) _src, leftover);
409 }
410 } /* end if width and elems both > 1 */
411}
412
418static void split_by_comma(const string &s, vector<unsigned long long> &res)
419{
420 const string delimiter = ",";
421 const size_t delim_len = delimiter.size();
422
423 size_t pos_start = 0, pos_end;
424
425 while ((pos_end = s.find (delimiter, pos_start)) != string::npos) {
426 res.push_back (stoull(s.substr(pos_start, pos_end - pos_start)));
427 pos_start = pos_end + delim_len;
428 }
429
430 res.push_back (stoull(s.substr (pos_start)));
431}
432
433void Chunk::parse_chunk_position_in_array_string(const string &pia, vector<unsigned long long> &cpia_vect)
434{
435 if (pia.empty()) return;
436
437 if (!cpia_vect.empty()) cpia_vect.clear();
438
439 // Assume input is [x,y,...,z] where x, ..., are integers; modest syntax checking
440 // [1] is a minimal 'position in array' string.
441 if (pia.find('[') == string::npos || pia.find(']') == string::npos || pia.size() < 3)
442 throw BESInternalError("while parsing a DMR++, chunk position string malformed", __FILE__, __LINE__);
443
444 if (pia.find_first_not_of("[]1234567890,") != string::npos)
445 throw BESInternalError("while parsing a DMR++, chunk position string illegal character(s)", __FILE__, __LINE__);
446
447 try {
448 split_by_comma(pia.substr(1, pia.size() - 2), cpia_vect);
449 }
450 catch(const std::invalid_argument &e) {
451 throw BESInternalError(string("while parsing a DMR++, chunk position string illegal character(s): ").append(e.what()), __FILE__, __LINE__);
452 }
453}
454
455
469void Chunk::set_position_in_array(const string &pia) {
470 parse_chunk_position_in_array_string(pia,d_chunk_position_in_array);
471}
472
481void Chunk::set_position_in_array(const std::vector<unsigned long long> &pia) {
482 if (pia.empty()) return;
483
484 if (!d_chunk_position_in_array.empty()) d_chunk_position_in_array.clear();
485
486 d_chunk_position_in_array = pia;
487}
488
497 return curl::get_range_arg_string(d_offset, d_size);
498}
499
518
519 // If there is no data url then there is nothing to add the parameter too.
520 if(d_data_url == nullptr)
521 return;
522
523 bool found = false;
524 string cloudydap_context_value = BESContextManager::TheManager()->get_context(S3_TRACKING_CONTEXT, found);
525 if (!found)
526 return;
527
541
542 bool add_tracking = false;
543
544 // All S3 buckets, virtual host style URL
545 // Simpler regex that's likely equivalent:
546 // ^https?:\/\/[a-z0-9]([-.a-z0-9]){1,61}[a-z0-9]\.s3[-.]us-(east|west)-[12])?\.amazonaws\.com\/.*$
547 string s3_vh_regex_str = R"(^https?:\/\/([a-z]|[0-9])(([a-z]|[0-9]|\.|-){1,61})([a-z]|[0-9])\.s3((\.|-)us-(east|west)-(1|2))?\.amazonaws\.com\/.*$)";
548
549 BESRegex s3_vh_regex(s3_vh_regex_str.c_str());
550 int match_result = s3_vh_regex.match(d_data_url->str().c_str(), d_data_url->str().size());
551 if(match_result>=0) {
552 auto match_length = (unsigned int) match_result;
553 if (match_length == d_data_url->str().size()) {
554 BESDEBUG(MODULE,
555 prolog << "FULL MATCH. pattern: " << s3_vh_regex_str << " url: " << d_data_url->str() << endl);
556 add_tracking = true;;
557 }
558 }
559
560 if(!add_tracking){
561 // All S3 buckets, path style URL
562 string s3_path_regex_str = R"(^https?:\/\/s3((\.|-)us-(east|west)-(1|2))?\.amazonaws\.com\/([a-z]|[0-9])(([a-z]|[0-9]|\.|-){1,61})([a-z]|[0-9])\/.*$)";
563 BESRegex s3_path_regex(s3_path_regex_str.c_str());
564 match_result = s3_path_regex.match(d_data_url->str().c_str(), d_data_url->str().size());
565 if(match_result>=0) {
566 auto match_length = (unsigned int) match_result;
567 if (match_length == d_data_url->str().size()) {
568 BESDEBUG(MODULE,
569 prolog << "FULL MATCH. pattern: " << s3_vh_regex_str << " url: " << d_data_url->str() << endl);
570 add_tracking = true;;
571 }
572 }
573 }
574
575 if (add_tracking) {
576 // Yup, headed to S3.
577 d_query_marker.append(S3_TRACKING_CONTEXT).append("=").append(cloudydap_context_value);
578 }
579}
580
581static void checksum_fletcher32_sanity_check(const void *_data, size_t _len) {
582 // Sanity check
583 if (!_data) {
584 string msg = prolog + "ERROR! checksum_fletcher32_sanity_check: _data is NULL";
585 BESDEBUG(MODULE, msg << endl);
586 throw BESInternalError(msg, __FILE__, __LINE__);
587
588 }
589 if (_len == 0) {
590 string msg = prolog + "ERROR! checksum_fletcher32_sanity_check: _len is 0";
591 BESDEBUG(MODULE, msg << endl);
592 throw BESInternalError(msg, __FILE__, __LINE__);
593 }
594}
602uint32_t
603checksum_fletcher32(const void *_data, size_t _len)
604{
605 checksum_fletcher32_sanity_check(_data, _len);
606
607 const auto *data = (const uint8_t *)_data; // Pointer to the data to be summed
608 size_t len = _len / 2; // Length in 16-bit words
609 uint32_t sum1 = 0, sum2 = 0;
610
611 // Compute checksum for pairs of bytes
612 // (the magic "360" value is the largest number of sums that can be performed without numeric overflow)
613 while (len) {
614 size_t tlen = len > 360 ? 360 : len;
615 len -= tlen;
616 do {
617 sum1 += (uint32_t)(((uint16_t)data[0]) << 8) | ((uint16_t)data[1]);
618 data += 2;
619 sum2 += sum1;
620 } while (--tlen);
621 sum1 = (sum1 & 0xffff) + (sum1 >> 16);
622 sum2 = (sum2 & 0xffff) + (sum2 >> 16);
623 }
624
625 /* Check for odd # of bytes */
626 if(_len % 2) {
627 sum1 += (uint32_t)(((uint16_t)*data) << 8);
628 sum2 += sum1;
629 sum1 = (sum1 & 0xffff) + (sum1 >> 16);
630 sum2 = (sum2 & 0xffff) + (sum2 >> 16);
631 } /* end if */
632
633 /* Second reduction step to reduce sums to 16 bits */
634 sum1 = (sum1 & 0xffff) + (sum1 >> 16);
635 sum2 = (sum2 & 0xffff) + (sum2 >> 16);
636
637 return ((sum2 << 16) | sum1);
638} /* end checksum_fletcher32() */
639
650void Chunk::filter_chunk(const string &filters, unsigned long long chunk_size, unsigned long long elem_width) {
651
652 if (d_is_inflated)
653 return;
654
655 chunk_size *= elem_width;
656
657 vector<string> filter_array = BESUtil::split(filters, ' ' );
658
659 // We need to check if the filters that include the deflate filters are contiguous.
660 // That is: the filters must be something like "deflate deflate deflate" instead of "deflate other_filter deflate"
661
662 bool is_1st_deflate = true;
663 unsigned cur_deflate_index = 0;
664 unsigned num_deflate = 0;
665
666 for (unsigned i = 0; i<filter_array.size(); i++) {
667
668 if (filter_array[i] == "deflate") {
669 if (is_1st_deflate == true) {
670 cur_deflate_index = i;
671 is_1st_deflate = false;
672 }
673 else if (i != (cur_deflate_index+1)) {
674 throw BESInternalError("The deflate filters must be adjacent to each other",
675 __FILE__, __LINE__);
676 }
677 else
678 cur_deflate_index = i;
679 num_deflate++;
680 }
681
682 }
683
684 // If there are >1 deflate filters applied, we want to localize the handling of the compressed data in this function.
685 unsigned deflate_index = 0;
686 unsigned long long out_buf_size = 0;
687 unsigned long long in_buf_size = 0;
688 char**destp = nullptr;
689 char* dest_deflate = nullptr;
690 char* tmp_dest = nullptr;
691
692 bool ignore_rest_deflate = false;
693
694 for (auto i = filter_array.rbegin(), e = filter_array.rend(); i != e; ++i) {
695
696 string filter = *i;
697
698 if (filter == "deflate") {
699
700 // Here we find that the deflate filter is applied twice.
701 // Note: we find one GHRSST file is using the deflate twice,
702 // However, for one chunk the deflate filter only applies once.
703 // The returned decompressed size of this chunk is equal to
704 // the chunk size. So we have to skip the second "inflate" of this chunk by
705 // checking if the inflated size is equal to the chunk size. KY 2022-08-07
706
707 if (num_deflate > 1 && !ignore_rest_deflate) {
708
709 dest_deflate = new char[chunk_size];
710 try {
711 destp = &dest_deflate;
712 if (deflate_index == 0) {
713 // First inflate, receive the buffer and the corresponding info from
714 // the BES, save the inflated buffer into a tmp. buffer.
715 out_buf_size = inflate(destp, chunk_size, get_rbuf(), get_rbuf_size());
716 tmp_dest = *destp;
717 }
718 else {
719 // Obtain the buffer and the size locally starting from the second inflate.
720 // Remember to release the tmp_buf memory.
721#if 0
722 tmp_buf = tmp_dest;
723 out_buf_size = inflate(destp, chunk_size, tmp_buf, in_buf_size);
724 tmp_dest = *destp;
725 delete[] tmp_buf;
726#endif
727 out_buf_size = inflate(destp, chunk_size, tmp_dest, in_buf_size);
728 delete[] tmp_dest;
729 tmp_dest = *destp;
730
731 }
732 deflate_index ++;
733 in_buf_size = out_buf_size;
734#if DMRPP_USE_SUPER_CHUNKS
735 // This is the last deflate filter, output the buffer.
736 if (in_buf_size == chunk_size)
737 ignore_rest_deflate = true;
738 if (ignore_rest_deflate || deflate_index == num_deflate) {
739 char* newdest = *destp;
740 // Need to use the out_buf_size insted of chunk_size since the size may be bigger than chunk_size. KY 2023-06-08
741 set_read_buffer(newdest, out_buf_size, chunk_size, true);
742 }
743
744#else
745 set_rbuf(dest_deflate, chunk_size);
746#endif
747
748 }
749 catch (...) {
750 delete[] dest_deflate;
751 delete[] tmp_dest;
752 throw;
753 }
754
755
756 }
757 else if(num_deflate == 1) {
758 // The following is the same code as before. We need to use the double pointer
759 // to pass the buffer. KY 2022-08-07
760 dest_deflate = new char[chunk_size];
761 destp = &dest_deflate;
762 try {
763 out_buf_size = inflate(destp, chunk_size, get_rbuf(), get_rbuf_size());
764 if (out_buf_size == 0) {
765 throw BESError("inflate size should be greater than 0", BES_INTERNAL_ERROR, __FILE__, __LINE__);
766 }
767 // This replaces (and deletes) the original read_buffer with dest.
768#if DMRPP_USE_SUPER_CHUNKS
769 char* new_dest=*destp;
770 set_read_buffer(new_dest, out_buf_size, chunk_size, true);
771#else
772 set_rbuf(dest_deflate, chunk_size);
773#endif
774 }
775 catch (...) {
776 delete[] dest_deflate;
777 throw;
778 }
779 }
780 }// end filter is deflate
781 else if (filter == "shuffle"){
782 // The internal buffer is chunk's full size at this point.
783 char *dest = new char[get_rbuf_size()];
784 try {
785 unshuffle(dest, get_rbuf(), get_rbuf_size(), elem_width);
786#if DMRPP_USE_SUPER_CHUNKS
788#else
789 set_rbuf(dest, get_rbuf_size());
790#endif
791 }
792 catch (...) {
793 delete[] dest;
794 throw;
795 }
796 } //end filter is shuffle
797 else if (filter == "fletcher32"){
798 // Compute the fletcher32 checksum and compare to the value of the last four bytes of the chunk.
799#if ACTUALLY_USE_FLETCHER32_CHECKSUM
800 // Get the last four bytes of chunk's data (which is a byte array) and treat that as the four-byte
801 // integer fletcher32 checksum. jhrg 10/15/21
802 if (get_rbuf_size() <= FLETCHER32_CHECKSUM) {
803 throw BESInternalError("fletcher32 filter: buffer size is less than the size of the checksum", __FILE__, __LINE__);
804 }
805
806 // Where is the checksum value?
807 auto data_ptr = get_rbuf() + get_rbuf_size() - FLETCHER32_CHECKSUM;
808 // Using a temporary variable ensures that the value is correctly positioned
809 // on a 4 byte memory alignment. Casting data_ptr to a pointer to uint_32 does not.
810 uint32_t f_checksum;
811 memcpy(&f_checksum, data_ptr, FLETCHER32_CHECKSUM );
812
813 // If the code should actually use the checksum (they can be expensive to compute), does it match
814 // with once computed on the data actually read? Maybe make this a bes.conf parameter?
815 // jhrg 10/15/21
816 uint32_t calc_checksum = checksum_fletcher32((const void *)get_rbuf(), get_rbuf_size() - FLETCHER32_CHECKSUM);
817
818 BESDEBUG(MODULE, prolog << "get_rbuf_size(): " << get_rbuf_size() << endl);
819 BESDEBUG(MODULE, prolog << "calc_checksum: " << calc_checksum << endl);
820 BESDEBUG(MODULE, prolog << "f_checksum: " << f_checksum << endl);
821 if (f_checksum != calc_checksum) {
822 throw BESInternalError("Data read from the DMR++ handler did not match the Fletcher32 checksum.",
823 __FILE__, __LINE__);
824 }
825#endif
826 if (d_read_buffer_size > FLETCHER32_CHECKSUM)
827 d_read_buffer_size -= FLETCHER32_CHECKSUM;
828 else {
829 throw BESInternalError("Data filtered with fletcher32 don't include the four-byte checksum.",
830 __FILE__, __LINE__);
831 }
832 } // end filter is fletcher32
833 } // end for loop
834 d_is_inflated = true;
835}
836
837unsigned int Chunk::obtain_compound_udf_type_size() const {
838
839 unsigned int ret_value = 0;
840
841 for (const auto &cudf_type_elm:compound_udf_type_elms) {
842
843 auto dtype = cudf_type_elm.first;
844 int type_size = 1;
845
846 switch(dtype) {
847 case libdap::dods_int8_c:
848 type_size = sizeof(int8_t);
849 break;
850
851 case libdap::dods_int16_c:
852 type_size = sizeof(int16_t);
853 break;
854
855 case libdap::dods_int32_c:
856 type_size = sizeof(int32_t);
857 break;
858
859 case libdap::dods_int64_c:
860 type_size = sizeof(int64_t);
861 break;
862
863 case libdap::dods_uint8_c:
864 case libdap::dods_byte_c:
865 type_size = sizeof(uint8_t);
866 break;
867
868 case libdap::dods_uint16_c:
869 type_size = sizeof(uint16_t);
870 break;
871
872 case libdap::dods_uint32_c:
873 type_size = sizeof(uint32_t);
874 break;
875
876 case libdap::dods_uint64_c:
877 type_size = sizeof(uint64_t);
878 break;
879
880 case libdap::dods_float32_c:
881 type_size = sizeof(float);
882 break;
883
884 case libdap::dods_float64_c:
885 type_size = sizeof(double);
886 break;
887
888 default:
889 throw BESInternalError("Unsupported user-defined fill value compound base type.", __FILE__, __LINE__);
890 }
891
892 ret_value += cudf_type_elm.second *type_size;
893 }
894
895 return ret_value;
896
897}
898
899unsigned int Chunk::get_value_size(libdap::Type type)
900{
901
902 if (type == libdap::dods_structure_c) {
903 if (struct_size !=0)
904 return struct_size;
905 else if (!compound_udf_type_elms.empty())
906 return obtain_compound_udf_type_size();
907 }
908 switch(type) {
909 case libdap::dods_int8_c:
910 return sizeof(int8_t);
911
912 case libdap::dods_int16_c:
913 return sizeof(int16_t);
914
915 case libdap::dods_int32_c:
916 return sizeof(int32_t);
917
918 case libdap::dods_int64_c:
919 return sizeof(int64_t);
920
921 case libdap::dods_uint8_c:
922 case libdap::dods_byte_c:
923 return sizeof(uint8_t);
924
925 case libdap::dods_uint16_c:
926 return sizeof(uint16_t);
927
928 case libdap::dods_uint32_c:
929 return sizeof(uint32_t);
930
931 case libdap::dods_uint64_c:
932 return sizeof(uint64_t);
933
934 case libdap::dods_float32_c:
935 return sizeof(float);
936
937 case libdap::dods_float64_c:
938 return sizeof(double);
939
940 default:
941 throw BESInternalError("Unknown fill value type.", __FILE__, __LINE__);
942 }
943}
944
945void Chunk::get_compound_fvalue(const string &v, vector<char> &compound_fvalue) const{
946
947 vector<string> fv_str;
948 obtain_fv_strs(fv_str,v);
949
950 char * temp_compound_fvalue = compound_fvalue.data();
951 size_t fvalue_count = 0;
952
953 for (const auto &cudf_type_elm:compound_udf_type_elms) {
954
955 auto dtype = cudf_type_elm.first;
956 auto num_elms = cudf_type_elm.second;
957 int type_size = 1;
958
959 switch(dtype) {
960 case libdap::dods_int8_c:
961 type_size = sizeof(int8_t);
962 for (int i = 0; i<num_elms;i++) {
963 auto temp_value = (int8_t)stoi(fv_str[fvalue_count]);
964 memcpy(temp_compound_fvalue,(char *)&temp_value,type_size);
965 fvalue_count++;
966 temp_compound_fvalue +=type_size;
967 }
968 break;
969
970 case libdap::dods_int16_c:
971 type_size = sizeof(int16_t);
972 for (int i = 0; i<num_elms;i++) {
973 auto temp_value = (int16_t)stoi(fv_str[fvalue_count]);
974 memcpy(temp_compound_fvalue,(char *)&temp_value,type_size);
975 fvalue_count++;
976 temp_compound_fvalue +=type_size;
977 }
978
979 break;
980
981 case libdap::dods_int32_c:
982 type_size = sizeof(int32_t);
983 for (int i = 0; i<num_elms;i++) {
984 auto temp_value = (int32_t)stoi(fv_str[fvalue_count]);
985 memcpy(temp_compound_fvalue,(char *)&temp_value,type_size);
986 fvalue_count++;
987 temp_compound_fvalue +=type_size;
988 }
989 break;
990
991 case libdap::dods_int64_c:
992 type_size = sizeof(int64_t);
993 for (int i = 0; i<num_elms;i++) {
994 auto temp_value = (int64_t)stoll(fv_str[fvalue_count]);
995 memcpy(temp_compound_fvalue,(char *)&temp_value,type_size);
996 fvalue_count++;
997 temp_compound_fvalue +=type_size;
998 }
999 break;
1000
1001 case libdap::dods_uint8_c:
1002 case libdap::dods_byte_c:
1003 type_size = sizeof(uint8_t);
1004 for (int i = 0; i<num_elms;i++) {
1005 auto temp_value = (uint8_t)stoi(fv_str[fvalue_count]);
1006 memcpy(temp_compound_fvalue,(char *)&temp_value,type_size);
1007 fvalue_count++;
1008 temp_compound_fvalue +=type_size;
1009 }
1010 break;
1011
1012 case libdap::dods_uint16_c:
1013 type_size = sizeof(uint16_t);
1014 for (int i = 0; i<num_elms;i++) {
1015 auto temp_value = (uint16_t)stoi(fv_str[fvalue_count]);
1016 memcpy(temp_compound_fvalue,(char *)&temp_value,type_size);
1017 fvalue_count++;
1018 temp_compound_fvalue +=type_size;
1019 }
1020 break;
1021
1022 case libdap::dods_uint32_c:
1023 type_size = sizeof(uint32_t);
1024 for (int i = 0; i<num_elms;i++) {
1025 auto temp_value = (uint32_t)stoul(fv_str[fvalue_count]);
1026 memcpy(temp_compound_fvalue,(char *)&temp_value,type_size);
1027 fvalue_count++;
1028 temp_compound_fvalue +=type_size;
1029 }
1030 break;
1031
1032 case libdap::dods_uint64_c:
1033 type_size = sizeof(uint64_t);
1034 for (int i = 0; i<num_elms;i++) {
1035 auto temp_value = (uint64_t)stoull(fv_str[fvalue_count]);
1036 memcpy(temp_compound_fvalue,(char *)&temp_value,type_size);
1037 fvalue_count++;
1038 temp_compound_fvalue +=type_size;
1039 }
1040 break;
1041
1042 case libdap::dods_float32_c:
1043 type_size = sizeof(float);
1044 for (int i = 0; i<num_elms;i++) {
1045 auto temp_value = (float)stof(fv_str[fvalue_count]);
1046 memcpy(temp_compound_fvalue,(char *)&temp_value,type_size);
1047 fvalue_count++;
1048 temp_compound_fvalue +=type_size;
1049 }
1050 break;
1051
1052 case libdap::dods_float64_c:
1053 type_size = sizeof(double);
1054 for (int i = 0; i<num_elms;i++) {
1055 auto temp_value = (double)stod(fv_str[fvalue_count]);
1056 memcpy(temp_compound_fvalue,(char *)&temp_value,type_size);
1057 fvalue_count++;
1058 temp_compound_fvalue +=type_size;
1059 }
1060 break;
1061
1062 default:
1063 throw BESInternalError("Unsupported user-defined fill value compound base type.", __FILE__, __LINE__);
1064 }
1065
1066 }
1067
1068}
1069
1070void Chunk::obtain_fv_strs(vector<string>& fv_str, const string &v) const{
1071
1072 string::size_type start = 0;
1073 string::size_type end = 0;
1074 char sep=' ';
1075
1076 while ((end = v.find(sep, start)) != string::npos) {
1077 fv_str.push_back(v.substr(start, end - start));
1078 start = end + 1;
1079 }
1080 fv_str.push_back(v.substr(start));
1081
1082}
1083
1084
1085const char * Chunk::get_value_ptr(fill_value &fv, libdap::Type type, const string &v, bool is_big_endian)
1086{
1087
1088 switch(type) {
1089 case libdap::dods_int8_c:
1090 fv.int8 = (int8_t)stoi(v);
1091 return (const char *)&fv.int8;
1092
1093 case libdap::dods_int16_c:
1094 fv.int16 = (int16_t)stoi(v);
1095 if (is_big_endian)
1096 fv.int16 = bswap_16(fv.int16);
1097 return (const char *)&fv.int16;
1098
1099 case libdap::dods_int32_c:
1100
1101 fv.int32 = (int32_t)stoi(v);
1102 if (is_big_endian)
1103 fv.int32 = bswap_32(fv.int32);
1104 return (const char *)&fv.int32;
1105
1106 case libdap::dods_int64_c:
1107 fv.int64 = (int64_t)stoll(v);
1108 if (is_big_endian)
1109 fv.int64 = bswap_64(fv.int64);
1110 return (const char *)&fv.int64;
1111
1112 case libdap::dods_uint8_c:
1113 case libdap::dods_byte_c:
1114 fv.uint8 = (uint8_t)stoi(v);
1115 return (const char *)&fv.uint8;
1116
1117 case libdap::dods_uint16_c:
1118 fv.uint16 = (uint16_t)stoi(v);
1119 if (is_big_endian)
1120 fv.uint16 = bswap_16(fv.uint16);
1121 return (const char *)&fv.uint16;
1122
1123 case libdap::dods_uint32_c:
1124 fv.uint32 = (uint32_t)stoul(v);
1125 if (is_big_endian)
1126 fv.uint32 = bswap_32(fv.uint32);
1127 return (const char *)&fv.uint32;
1128
1129 case libdap::dods_uint64_c:
1130 fv.uint64 = (uint64_t)stoull(v);
1131 if (is_big_endian)
1132 fv.uint64 = bswap_32(fv.uint64);
1133 return (const char *)&fv.uint64;
1134
1135 case libdap::dods_float32_c:
1136 {
1137 fv.f = stof(v);
1138 auto fv_float_p=(char *)&fv.f;
1139 if (is_big_endian)
1140 swap_float32(fv_float_p,1);
1141 return (const char *)fv_float_p;
1142 }
1143
1144 case libdap::dods_float64_c:
1145 {
1146 fv.d = stod(v);
1147 auto fv_double_p=(char *)&fv.d;
1148 if (is_big_endian)
1149 swap_float64 (fv_double_p,1);
1150 return (const char *)fv_double_p;
1151 }
1152 case libdap::dods_str_c:
1153 return v.c_str();
1154
1155 default:
1156 throw BESInternalError("Unknown fill value type.", __FILE__, __LINE__);
1157 }
1158}
1159
1164
1165 fill_value fv;
1166 unsigned int value_size = 0;
1167
1168 if (d_fill_value_type == libdap::dods_str_c)
1169 value_size = (unsigned int)d_fill_value.size();
1170 else
1171 value_size = get_value_size(d_fill_value_type);
1172
1173 bool is_big_endian = false;
1174 if (d_byte_order == "BE")
1175 is_big_endian = true;
1176
1177 const char *value = nullptr;
1178 vector<char> compound_fvalue;
1179
1180 if (d_fill_value_type == libdap::dods_structure_c && (!compound_udf_type_elms.empty() || struct_size !=0)) {
1181
1182 if (value_size == 0)
1183 throw BESInternalError("The size of fill value should NOT be 0.", __FILE__,__LINE__);
1184 compound_fvalue.resize(value_size);
1185 // When the fill_value is 0 for the compound datatype, we don't need to retrieve the filled values.
1186 // This will effectively resolve the compound datatype default filled value case.
1187 if (d_fill_value !="0")
1188 get_compound_fvalue(d_fill_value,compound_fvalue);
1189 value = compound_fvalue.data();
1190
1191 }
1192 else {
1193
1194 value = get_value_ptr(fv, d_fill_value_type, d_fill_value,is_big_endian);
1195
1196 // If a string is empty, current build_dmrpp will assign an "" to fillvalue and causes the value size to be 0.
1197 if(d_fill_value_type == libdap::dods_str_c && d_fill_value==""){
1198 d_fill_value = ' ';
1199 value_size = 1;
1200 }
1201
1202 if (value_size == 0)
1203 throw BESInternalError("The size of fill value should NOT be 0.", __FILE__,__LINE__);
1204 }
1205
1206 unsigned long long num_values = get_rbuf_size() / value_size;
1207
1208
1209 char *buffer = get_rbuf();
1210
1211 for (unsigned long long i = 0; i < num_values; ++i, buffer += value_size) {
1212 memcpy(buffer, value, value_size);
1213 }
1214
1216}
1217
1228 if (d_is_read)
1229 return;
1230
1231 // By default, d_read_buffer_is_mine is true. But if this is part of a SuperChunk
1232 // then the SuperChunk will have allocated memory and d_read_buffer_is_mine is false.
1233 if (d_read_buffer_is_mine)
1235
1236 if (d_uses_fill_value) {
1238 }
1239 else {
1240 dmrpp_easy_handle *handle = DmrppRequestHandler::curl_handle_pool->get_easy_handle(this);
1241 if (!handle)
1242 throw BESInternalError(prolog + "No more libcurl handles.", __FILE__, __LINE__);
1243
1244 try {
1245 handle->read_data(); // retries until success when appropriate, else throws
1246 DmrppRequestHandler::curl_handle_pool->release_handle(handle);
1247 }
1248 catch (...) {
1249 // TODO See https://bugs.earthdata.nasa.gov/browse/HYRAX-378
1250 // It may be that this is the code that catches throws from
1251 // chunk_write_data and based on read_data()'s behavior, the
1252 // code should probably stop _all_ transfers, reclaim all
1253 // handles and send a failure message up the call stack.
1254 // jhrg 4/7/21
1255 DmrppRequestHandler::curl_handle_pool->release_handle(handle);
1256 throw;
1257 }
1258 }
1259
1260 // If the expected byte count was not read, it's an error.
1261 if (get_size() != get_bytes_read()) {
1262 ostringstream oss;
1263 oss << "Wrong number of bytes read for chunk; read: " << get_bytes_read() << ", expected: " << get_size();
1264 throw BESInternalError(oss.str(), __FILE__, __LINE__);
1265 }
1266
1267 d_is_read = true;
1268}
1269
1270// direct IO method that reads chunks.
1271void Chunk::read_chunk_dio() {
1272
1273 // Read chunk for dio - use read_chunk() as a reference.
1274 if (d_is_read)
1275 return;
1276
1277 // By default, d_read_buffer_is_mine is true. But if this is part of a SuperChunk
1278 // then the SuperChunk will have allocated memory and d_read_buffer_is_mine is false.
1279 if (d_read_buffer_is_mine)
1281
1282 dmrpp_easy_handle *handle = DmrppRequestHandler::curl_handle_pool->get_easy_handle(this);
1283 if (!handle)
1284 throw BESInternalError(prolog + "No more libcurl handles.", __FILE__, __LINE__);
1285
1286 try {
1287 handle->read_data(); // retries until success when appropriate, else throws
1288 DmrppRequestHandler::curl_handle_pool->release_handle(handle);
1289 }
1290 catch (...) {
1291 // TODO See https://bugs.earthdata.nasa.gov/browse/HYRAX-378
1292 // It may be that this is the code that catches throws from
1293 // chunk_write_data and based on read_data()'s behavior, the
1294 // code should probably stop _all_ transfers, reclaim all
1295 // handles and send a failure message up the call stack.
1296 // jhrg 4/7/21
1297 DmrppRequestHandler::curl_handle_pool->release_handle(handle);
1298 throw;
1299 }
1300
1301
1302 // If the expected byte count was not read, it's an error.
1303 if (get_size() != get_bytes_read()) {
1304 ostringstream oss;
1305 oss << "Wrong number of bytes read for chunk; read: " << get_bytes_read() << ", expected: " << get_size();
1306 throw BESInternalError(oss.str(), __FILE__, __LINE__);
1307 }
1308
1309 d_is_read = true;
1310
1311
1312}
1313
1323void Chunk::dump(ostream &oss) const {
1324 oss << "Chunk";
1325 oss << "[ptr='" << (void *) this << "']";
1326 oss << "[data_url='" << d_data_url->str() << "']";
1327 oss << "[offset=" << d_offset << "]";
1328 oss << "[size=" << d_size << "]";
1329 oss << "[chunk_position_in_array=(";
1330 for (unsigned long long i = 0; i < d_chunk_position_in_array.size(); i++) {
1331 if (i) oss << ",";
1332 oss << d_chunk_position_in_array[i];
1333 }
1334 oss << ")]";
1335 oss << "[is_read=" << d_is_read << "]";
1336 oss << "[is_inflated=" << d_is_inflated << "]";
1337}
1338
1339string Chunk::to_string() const {
1340 std::ostringstream oss;
1341 dump(oss);
1342 return oss.str();
1343}
1344
1357std::shared_ptr<http::url> Chunk::get_data_url() const {
1358
1359 // The d_data_url may be nullptr(fillvalue case).
1360 if (d_data_url == nullptr)
1361 return d_data_url;
1362 std::shared_ptr<http::EffectiveUrl> effective_url = EffectiveUrlCache::TheCache()->get_effective_url(d_data_url);
1363 BESDEBUG(MODULE, prolog << "Using data_url: " << effective_url->str() << endl);
1364
1365#if ENABLE_TRACKING_QUERY_PARAMETER
1366 //A conditional call to void Chunk::add_tracking_query_param()
1367 // here for the NASA cost model work THG's doing. jhrg 8/7/18
1368 if (!d_query_marker.empty()) {
1369 string url_str = effective_url->str();
1370 if(url_str.find('?') != string::npos){
1371 url_str.append("&");
1372 }
1373 else {
1374 url_str.append("?");
1375 }
1376 url_str += d_query_marker;
1377 shared_ptr<http::url> query_marker_url( new http::url(url_str));
1378 return query_marker_url;
1379 }
1380#endif
1381
1382 return effective_url;
1383}
1384
1385} // namespace dmrpp
1386
Base exception class for the BES with basic string message.
Definition BESError.h:66
exception thrown if internal error encountered
Regular expression matching.
Definition BESRegex.h:89
int match(const char *s, int len, int pos=0) const
Does the pattern match.
Definition BESRegex.cc:70
static std::vector< std::string > split(const std::string &s, char delim='/', bool skip_empty=true)
Splits the string s into the return vector of tokens using the delimiter delim and skipping empty val...
Definition BESUtil.cc:1068
static EffectiveUrlCache * TheCache()
Get the singleton EffectiveUrlCache instance.
virtual void set_bytes_read(unsigned long long bytes_read)
Set the size of this Chunk's data block.
Definition Chunk.h:484
virtual void dump(std::ostream &strm) const
Definition Chunk.cc:1323
virtual char * get_rbuf()
Definition Chunk.h:515
virtual void read_chunk()
Definition Chunk.cc:1227
void add_tracking_query_param()
Modify this chunk's data URL so that it includes tracking info.
Definition Chunk.cc:517
virtual std::string get_curl_range_arg_string()
Returns a curl range argument. The libcurl requires a string argument for range-ge activitys,...
Definition Chunk.cc:496
virtual std::shared_ptr< http::url > get_data_url() const
Get the data URL for this chunk.
Definition Chunk.cc:1357
virtual void set_rbuf_to_size()
Allocates the internal read buffer to be d_size bytes.
Definition Chunk.h:503
virtual unsigned long long get_bytes_read() const
Definition Chunk.h:475
void set_position_in_array(const std::string &pia)
parse the chunk position string
Definition Chunk.cc:469
virtual unsigned long long get_rbuf_size() const
Definition Chunk.h:543
virtual unsigned long long get_size() const
Definition Chunk.h:406
void set_read_buffer(char *buf, unsigned long long buf_size, unsigned long long bytes_read=0, bool assume_ownership=true)
Set the target read buffer for this chunk.
Definition Chunk.h:531
virtual void filter_chunk(const std::string &filters, unsigned long long chunk_size, unsigned long long elem_width)
filter data in the chunk
Definition Chunk.cc:650
virtual void load_fill_values()
Load the chunk with fill values - temporary implementation.
Definition Chunk.cc:1163
void set_response_content_type(const std::string &ct)
Set the response type of the last response.
Definition Chunk.h:400
Bundle a libcurl easy handle with other information.
void read_data()
This is the read_data() method for all transfers.
std::shared_ptr< EffectiveUrl > get_effective_url(std::shared_ptr< url > source_url)
Parse a URL into the protocol, host, path and query parts.
Definition url_impl.h:44