bes Updated for version 3.21.1
The Backend Server (BES) is the lower two tiers of the Hyrax data server
DMZ.cc
1// -*- mode: c++; c-basic-offset:4 -*-
2
3// This file is part of the BES
4
5// Copyright (c) 2021 OPeNDAP, Inc.
6// Author: James Gallagher <jgallagher@opendap.org>
7//
8// This library is free software; you can redistribute it and/or
9// modify it under the terms of the GNU Lesser General Public
10// License as published by the Free Software Foundation; either
11// version 2.1 of the License, or (at your option) any later version.
12//
13// This library is distributed in the hope that it will be useful,
14// but WITHOUT ANY WARRANTY; without even the implied warranty of
15// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16// Lesser General Public License for more details.
17//
18// You should have received a copy of the GNU Lesser General Public
19// License along with this library; if not, write to the Free Software
20// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
21//
22// You can contact OPeNDAP, Inc. at PO Box 112, Saunderstown, RI. 02874-0112.
23
24#include <vector>
25#include <unordered_set>
26#include <stack>
27#include <string>
28#include <iostream>
29#include <fstream>
30#include <cstring>
31#include <zlib.h>
32
33#include <libdap/BaseType.h>
34#include <libdap/Array.h>
35#include <libdap/Type.h>
36#include <libdap/D4Dimensions.h>
37#include <libdap/D4Group.h>
38#include <libdap/D4BaseTypeFactory.h>
39#include <libdap/D4Enum.h>
40#include <libdap/D4EnumDefs.h>
41#include <libdap/D4Attributes.h>
42#include <libdap/D4Maps.h>
43#include <libdap/DMR.h>
44#include <libdap/util.h> // is_simple_type()
45
46#include "DmrppNames.h"
47
48#define PUGIXML_NO_XPATH
49#define PUGIXML_HEADER_ONLY
50#include <pugixml.hpp>
51
52#include "url_impl.h" // see bes/http
53#include "DMRpp.h"
54#include "DMZ.h" // this includes the pugixml header
55#include "Chunk.h"
56#include "DmrppCommon.h"
57#include "DmrppArray.h"
58#include "DmrppStructure.h"
59#include "DmrppByte.h"
60#include "DmrppStr.h"
61#include "DmrppUrl.h"
62#include "DmrppD4Group.h"
63#include "Base64.h"
64#include "DmrppRequestHandler.h"
65#include "DmrppChunkOdometer.h"
66#include "TheBESKeys.h"
67#include "BESDebug.h"
68#include "BESUtil.h"
69#include "BESLog.h"
70#include "vlsa_util.h"
71
72using namespace pugi;
73using namespace std;
74using namespace libdap;
75
76// The pugixml library does not grok namespaces. So, for a tag named 'dmrpp:chunks'
77// if TREAT_NAMESPACES_AS_LITERALS is '1' the parser matches the whole string. If it
78// is '0' the parser only matches the characters after the colon. In both cases the
79// namespace (as XML intends) is not used. Using '1' is a bit more efficient.
80// jhrg 11/2/21
81#define TREAT_NAMESPACES_AS_LITERALS 1
82
83// THe code can either search for a DAP variable's information in the XML, or it can
84// record that during the parse process. Set this when/if the code does the latter.
85// Using this simplifies the lazy-load process, particularly for the DAP2 DDS and
86// data responses (which have not yet been coded completely). jhrg 11/17/21
87#define USE_CACHED_XML_NODE 1
88
89#define SUPPORT_FILL_VALUE_CHUNKS 1
90
91#define prolog std::string("DMZ::").append(__func__).append("() - ")
92
93
94namespace dmrpp {
95
96using shape = std::vector<unsigned long long>;
97
98// The original unsupported fillValue flags from 4/22
99constexpr static const auto UNSUPPORTED_STRING = "unsupported-string";
100constexpr static const auto UNSUPPORTED_ARRAY = "unsupported-array";
101constexpr static const auto UNSUPPORTED_COMPOUND = "unsupported-compound";
102// Added when Arrays Of Fixed Length Strings. The unsupported-string value was dropped at that time.
103constexpr static const auto UNSUPPORTED_VARIABLE_LENGTH_STRING = "unsupported-variable-length-string";
104
105constexpr static const auto ELIDE_UNSUPPORTED_KEY = "DMRPP.Elide.Unsupported";
106
107bool DMZ::d_elide_unsupported = true;
108
109
110#if 1
111const std::set<std::string> DMZ::variable_elements{"Byte", "Int8", "Int16", "Int32", "Int64", "UInt8", "UInt16", "UInt32",
112 "UInt64", "Float32", "Float64", "String", "Structure", "Sequence",
113 "Enum", "Opaque"};
114#endif
115
116
117
119static inline bool is_eq(const char *value, const char *key)
120{
121#if TREAT_NAMESPACES_AS_LITERALS
122 return strcmp(value, key) == 0;
123#else
124 if (strcmp(value, key) == 0) {
125 return true;
126 }
127 else {
128 const char* colon = strchr(value, ':');
129 return colon && strcmp(colon + 1, key) == 0;
130 }
131#endif
132}
133
135static inline bool has_dim_nodes(const xml_node &var_node)
136{
137 return var_node.child("Dim"); // just one is enough
138}
139
141static inline bool member_of(const set<string> &elements_set, const string &element_name)
142{
143 return elements_set.find(element_name) != elements_set.end();
144}
145
147static inline DmrppCommon *dc(BaseType *btp)
148{
149 auto *dc = dynamic_cast<DmrppCommon*>(btp);
150 if (!dc)
151 throw BESInternalError(string("Expected a BaseType that was also a DmrppCommon instance (")
152 .append((btp) ? btp->name() : "unknown").append(")."), __FILE__, __LINE__);
153 return dc;
154}
155
156
161void DMZ::load_config_from_keys()
162{
163 // ########################################################################
164 // Loads the ELIDE_UNSUPPORTED_KEY (see top of file for key definition)
165 // And if it's set, and set to true, then we set the eliding flag to true.
166 d_elide_unsupported = TheBESKeys::TheKeys()->read_bool_key(ELIDE_UNSUPPORTED_KEY,false);
167}
168
174DMZ::DMZ(const string &file_name)
175{
176 load_config_from_keys();
177 parse_xml_doc(file_name);
178}
179
184void
185DMZ::parse_xml_doc(const string &file_name)
186{
187 std::ifstream stream(file_name);
188
189 // Free memory used by a previously parsed document.
190 d_xml_doc.reset();
191
192 // parse_ws_pcdata_single will include the space when it appears in a <Value> </Value>
193 // DAP Attribute element. jhrg 11/3/21
194 pugi::xml_parse_result result = d_xml_doc.load(stream, pugi::parse_default | pugi::parse_ws_pcdata_single);
195
196 if (!result)
197 throw BESInternalError(string("DMR++ parse error: ").append(result.description()), __FILE__, __LINE__);
198
199 if (!d_xml_doc.document_element())
200 throw BESInternalError("No DMR++ data present.", __FILE__, __LINE__);
201}
202
203
204
211bool flagged_as_unsupported_type(xml_node var_node, string &unsupported_flag) {
212 if (var_node == nullptr) {
213 throw BESInternalError(prolog + "Received null valued xml_node in the DMR++ XML document.", __FILE__, __LINE__);
214 }
215
216 // We'll start assuming it's not flagged as unsupported
217 bool is_unsupported_type = false;
218
219 // We know the unsupported flag is held in the fillValue attribute of the dmrpp:chunks element.
220 auto chunks = var_node.child("dmrpp:chunks");
221 if(!chunks) {
222 // No dmrpp:chunks? Then no fillValue and we can be done, it's supported.
223 return is_unsupported_type;
224 }
225
226 xml_attribute fillValue_attr = chunks.attribute("fillValue");
227 if(!fillValue_attr) {
228 // No fillValue attribute? Then we can be done, it's supported.
229 return is_unsupported_type;
230 }
231
232 // We found th fillValue attribute, So now we have to deal with its various tragic values...
233 if(is_eq(fillValue_attr.value(), UNSUPPORTED_STRING)){
234 // UNSUPPORTED_STRING is the older, indeterminate, tag which might label a truly
235 // unsupported VariableLengthString or it could be a labeling FixedLengthString.
236 // In order to find out we need to look in XML DOM to determine if this is an Array, and
237 // if so, to see if it's the FixedLengthString case:
238 // <dmrpp:FixedLengthStringArray string_length ... />
239 // This should be a child of var_node.
240
241 // Start by making it unsupported and then check each of the exceptions.
242 is_unsupported_type = true;
243
244 auto dim_node = var_node.child("Dim");
245 if(!dim_node) {
246 // No dims? Then this is a scalar String and it's cool.
247 // We dump the BS fillValue for one that makes some sense in Stringville
248 fillValue_attr.set_value("");
249 is_unsupported_type = false;
250 }
251 else {
252 // It's an array, so is it a FixedLengthStringArray??
253 auto flsa_node = var_node.child("dmrpp:FixedLengthStringArray");
254 if(flsa_node){
255 // FixedLengthStringArray arrays work!
256 // We dump the BS fillValue for one that makes some sense in Stringville
257 fillValue_attr.set_value("");
258 is_unsupported_type = false;
259 }
260 }
261 }
262 else if(is_eq(fillValue_attr.value(),UNSUPPORTED_VARIABLE_LENGTH_STRING)) {
263 unsupported_flag=fillValue_attr.value();
264 is_unsupported_type = true;
265 }
266 else if(is_eq(fillValue_attr.value(),UNSUPPORTED_ARRAY)){
267 unsupported_flag=fillValue_attr.value();
268 is_unsupported_type = true;
269 }
270 else if(is_eq(fillValue_attr.value(),UNSUPPORTED_COMPOUND)){
271 unsupported_flag=fillValue_attr.value();
272 is_unsupported_type = true;
273 }
274
275 return is_unsupported_type;
276}
277
278
279
289void
290DMZ::parse_xml_string(const string &source)
291{
292 pugi::xml_parse_result result = d_xml_doc.load_string(source.c_str());
293
294 if (!result)
295 throw BESInternalError(string("DMR++ parse error: ").append(result.description()), __FILE__, __LINE__);
296
297 if (!d_xml_doc.document_element())
298 throw BESInternalError("No DMR++ data present.", __FILE__, __LINE__);
299}
300
310void DMZ::process_dataset(DMR *dmr, const xml_node &xml_root)
311{
312 // Process the attributes
313 int required_attrs_found = 0; // there are 1
314 string href_attr;
315 bool href_trusted = false;
316 string dmrpp_version; // empty or holds a value if dmrpp::version is present
317 for (xml_attribute attr = xml_root.first_attribute(); attr; attr = attr.next_attribute()) {
318 if (is_eq(attr.name(), "name")) {
319 ++required_attrs_found;
320 dmr->set_name(attr.value());
321 }
322 else if (is_eq(attr.name(), "dapVersion")) {
323 dmr->set_dap_version(attr.value());
324 }
325 else if (is_eq(attr.name(), "dmrVersion")) {
326 dmr->set_dmr_version(attr.value());
327 }
328 else if (is_eq(attr.name(), "base")) {
329 dmr->set_request_xml_base(attr.value());
330 BESDEBUG(PARSER, prolog << "Dataset xml:base is set to '" << dmr->request_xml_base() << "'" << endl);
331 }
332 // The pugixml library does not use XML namespaces AFAIK. jhrg 11/2/21
333 else if (is_eq(attr.name(), "xmlns")) {
334 dmr->set_namespace(attr.value());
335 }
336 // This code does not use namespaces. By default, we assume the DMR++ elements
337 // all use the namespace prefix 'dmrpp'. jhrg 11/2/21
338 else if (is_eq(attr.name(), "dmrpp:href")) {
339 href_attr = attr.value();
340 }
341 else if (is_eq(attr.name(), "dmrpp:trust")) {
342 href_trusted = is_eq(attr.value(), "true");
343 }
344 else if (is_eq(attr.name(), "dmrpp:version")) {
345 dmrpp_version = attr.value();
346 }
347 // We allow other, non recognized attributes, so there is no 'else' jhrg 10/20/21
348 }
349
350 if (dmrpp_version.empty()) { // old style DMR++, set enable-kludge flag
351 DmrppRequestHandler::d_emulate_original_filter_order_behavior = true;
352 }
353 else {
354 auto dmrpp = dynamic_cast<DMRpp*>(dmr);
355 if (dmrpp) {
356 dmrpp->set_version(dmrpp_version);
357 }
358 }
359
360 if (required_attrs_found != 1)
361 throw BESInternalError("DMR++ XML dataset element missing one or more required attributes.", __FILE__, __LINE__);
362
363 if (href_attr.empty())
364 throw BESInternalError("DMR++ XML dataset element dmrpp:href is missing. ", __FILE__, __LINE__);
365
366 d_dataset_elem_href.reset(new http::url(href_attr, href_trusted));
367}
368
374void DMZ::process_dimension(D4Group *grp, const xml_node &dimension_node)
375{
376 string name_value;
377 string size_value;
378 for (xml_attribute attr = dimension_node.first_attribute(); attr; attr = attr.next_attribute()) {
379 if (is_eq(attr.name(), "name")) {
380 name_value = attr.value();
381 }
382 else if (is_eq(attr.name(), "size")) {
383 size_value = attr.value();
384 }
385 }
386
387 if (name_value.empty() || size_value.empty())
388 throw BESInternalError("The required attribute 'name' or 'size' was missing from a Dimension element.", __FILE__, __LINE__);
389
390 // This getter (dim_def) allocates a new object if needed.
391 try {
392 auto *dimension = new D4Dimension();
393 dimension->set_name(name_value);
394 dimension->set_size(size_value);
395 grp->dims()->add_dim_nocopy(dimension);
396 }
397 catch (Error &e) {
398 throw BESInternalError(e.get_error_message(), __FILE__, __LINE__);
399 }
400}
401
409void DMZ::process_dim(DMR *dmr, D4Group *grp, Array *array, const xml_node &dim_node)
410{
411 string name_value;
412 string size_value;
413 for (xml_attribute attr = dim_node.first_attribute(); attr; attr = attr.next_attribute()) {
414 if (is_eq(attr.name(), "name")) {
415 name_value = attr.value();
416 }
417 else if (is_eq(attr.name(), "size")) {
418 size_value = attr.value();
419 }
420 }
421
422 if (name_value.empty() && size_value.empty())
423 throw BESInternalError("Either 'size' or 'name' must be used in a Dim element.", __FILE__, __LINE__);
424 if (!name_value.empty() && !size_value.empty())
425 throw BESInternalError("Only one of 'size' and 'name' are allowed in a Dim element, but both were used.", __FILE__, __LINE__);
426
427 if (!size_value.empty()) {
428 BESDEBUG(PARSER, prolog << "Processing nameless Dim of size: " << stoll(size_value) << endl);
429 array->append_dim_ll(stoll(size_value));
430 }
431 else if (!name_value.empty()) {
432 BESDEBUG(PARSER, prolog << "Processing Dim with named Dimension reference: " << name_value << endl);
433
434 D4Dimension *dim;
435 if (name_value[0] == '/') // lookup the Dimension in the root group
436 dim = dmr->root()->find_dim(name_value);
437 else
438 // get enclosing Group and lookup Dimension there
439 dim = grp->find_dim(name_value);
440
441 if (!dim)
442 throw BESInternalError("The dimension '" + name_value + "' was not found while parsing the variable '" + array->name() + "'.",__FILE__,__LINE__);
443
444 array->append_dim(dim);
445 }
446}
447
448void DMZ::process_map(DMR *dmr, D4Group *grp, Array *array, const xml_node &map_node)
449{
450 string name_value;
451 string size_value;
452 for (xml_attribute attr = map_node.first_attribute(); attr; attr = attr.next_attribute()) {
453 if (is_eq(attr.name(), "name")) {
454 name_value = attr.value();
455 }
456 }
457
458 // All map names are FQNs. If we get one that isn't, assume it's within the most current group.
459 if (name_value[0] != '/')
460 name_value = grp->FQN() + name_value;
461
462 // The array variable that holds the data for the Map
463 Array *map_source = dmr->root()->find_map_source(name_value);
464
465 // In the SAX2 parser, we had 'strict' and 'permissive' modes. For Maps, permissive
466 // allowed the DAP variable for a Map to be missing so that users could request just
467 // the data with the maps. I'm implementing that behavior. Below is the original
468 // comment from DmrppParserSAX2.cc. jhrg 11/3/21
469
470 // Change: If the parser is in 'strict' mode (the default) and the Array named by
471 // the Map cannot be fond, it is an error. If 'strict' mode is false (permissive
472 // mode), then this is not an error. However, the Array referenced by the Map will
473 // be null. This is a change in the parser's behavior to accommodate requests for
474 // Arrays that include Maps that do not also include the Map(s) in the request.
475 // See https://opendap.atlassian.net/browse/HYRAX-98. jhrg 4/13/16
476
477 array->maps()->add_map(new D4Map(name_value, map_source));
478}
479
494void DMZ::process_variable(DMR *dmr, D4Group *group, Constructor *parent, const xml_node &var_node)
495{
496 if(!group){
497 throw BESInternalError(
498 prolog + "Received a null valued Group pointer!", __FILE__, __LINE__);
499 }
500
501 string unsupported_flag;
502 if(d_elide_unsupported && flagged_as_unsupported_type(var_node, unsupported_flag)){
503 // And in this way we elide the unsupported types - we don't process the DAP object
504 // if it's got the unsupported bits in fillValue
505 auto var_name = var_node.attribute("name");
506 auto var_type = var_node.name();
507 INFO_LOG(prolog + "Unsupported Type Encountered: " + var_type + " " + var_name.value() + "; flag: '" + unsupported_flag + "'\n");
508 return;
509 }
510
511 // Variables are declared using nodes with type names (e.g., <Float32...>)
512 // Variables are arrays if they have one or more <Dim...> child nodes.
513 Type t = get_type(var_node.name());
514
515 if(t == dods_group_c){ // Groups are special and handled elsewhere
516 throw BESInternalError(
517 prolog + "ERROR - The variable node to process is a Group type! "
518 "This is handled elsewhere, not here. Parser State Issue!!", __FILE__, __LINE__);
519 }
520
521 BaseType *btp;
522 if (has_dim_nodes(var_node)) {
523 // If it has Dim nodes then it's an array!
524 btp = add_array_variable(dmr, group, parent, t, var_node);
525 if (t == dods_structure_c || t == dods_sequence_c) {
526 if(btp->type() != dods_array_c || btp->var()->type() != t){
527 throw BESInternalError(
528 prolog + "Failed to create an array variable for " + var_node.name(), __FILE__, __LINE__);
529 }
530 // NB: For an array of a Constructor, add children to the Constructor, not the array
531 parent = dynamic_cast<Constructor*>(btp->var());
532 if(!parent){
533 throw BESInternalError(
534 prolog + "Failed to cast " + btp->var()->type_name() + " " + btp->name() +
535 " to an instance of Constructor." , __FILE__, __LINE__);
536 }
537 for (auto child = var_node.first_child(); child; child = child.next_sibling()) {
538 if (member_of(variable_elements, child.name()))
539 process_variable(dmr, group, parent, child);
540 }
541 }
542 }
543 else {
544 // Things not arrays must be scalars...
545 btp = add_scalar_variable(dmr, group, parent, t, var_node);
546 if (t == dods_structure_c || t == dods_sequence_c) {
547 if(btp->type() != t){
548 throw BESInternalError(
549 prolog + "Failed to create a scalar variable for " + var_node.name(), __FILE__, __LINE__);
550 }
551 parent = dynamic_cast<Constructor*>(btp);
552 if(!parent){
553 throw BESInternalError(
554 prolog + "Failed to cast " + btp->var()->type_name() + " " + btp->name() +
555 " to an instance of Constructor." , __FILE__, __LINE__);
556 }
557 for (auto child = var_node.first_child(); child; child = child.next_sibling()) {
558 if (member_of(variable_elements, child.name()))
559 process_variable(dmr, group, parent, child);
560 }
561 }
562 }
563
564 dc(btp)->set_xml_node(var_node);
565}
566
574BaseType *DMZ::build_variable(DMR *dmr, D4Group *group, Type t, const xml_node &var_node)
575{
576 if(!dmr->factory()){
577 throw BESInternalError(prolog + "ERROR - Received a DMR without a class factory!", __FILE__, __LINE__);
578 }
579
580 string name_value;
581 string enum_value;
582 for (xml_attribute attr = var_node.first_attribute(); attr; attr = attr.next_attribute()) {
583 if (is_eq(attr.name(), "name")) {
584 name_value = attr.value();
585 }
586 if (is_eq(attr.name(), "enum")) {
587 enum_value = attr.value();
588 }
589 }
590
591 if (name_value.empty())
592 throw BESInternalError("The variable 'name' attribute was missing.", __FILE__, __LINE__);
593
594 BaseType *btp = dmr->factory()->NewVariable(t, name_value);
595 if (!btp)
596 throw BESInternalError("Could not instantiate the variable ' "+ name_value +"'.", __FILE__, __LINE__);
597
598 btp->set_is_dap4(true);
599
600 // I cannot find a test on the code on enum. Is this part of code really tested? KY 2023-12-21
601 if (t == dods_enum_c) {
602 if (enum_value.empty())
603 throw BESInternalError("The variable ' " + name_value + "' lacks an 'enum' attribute.", __FILE__, __LINE__);
604
605 D4EnumDef *enum_def;
606 if (enum_value[0] == '/')
607 enum_def = dmr->root()->find_enum_def(enum_value);
608 else
609 enum_def = group->find_enum_def(enum_value);
610
611 if (!enum_def)
612 throw BESInternalError("Could not find the Enumeration definition '" + enum_value + "'.", __FILE__, __LINE__);
613
614 dynamic_cast<D4Enum&>(*btp).set_enumeration(enum_def);
615 }
616
617 return btp;
618}
619
630BaseType *DMZ::add_scalar_variable(DMR *dmr, D4Group *group, Constructor *parent, Type t, const xml_node &var_node)
631{
632 if(!group){
633 throw BESInternalError(prolog + "ERROR - Received a null valued Group pointer!", __FILE__, __LINE__);
634 }
635
636 BaseType *btp = build_variable(dmr, group, t, var_node);
637
638 // if parent is non-null, the code should add the new var to a constructor,
639 // else add the new var to the group.
640 if (parent)
641 parent->add_var_nocopy(btp);
642 else
643 group->add_var_nocopy(btp);
644
645 return btp;
646}
647
662BaseType *DMZ::add_array_variable(DMR *dmr, D4Group *group, Constructor *parent, Type t, const xml_node &var_node)
663{
664 if(!group){
665 throw BESInternalError(prolog + "ERROR - Received a null valued Group pointer!", __FILE__, __LINE__);
666 }
667
668 BaseType *btp = build_variable(dmr, group, t, var_node);
669
670 // Transform the scalar to an array
671 auto *array = static_cast<DmrppArray*>(dmr->factory()->NewVariable(dods_array_c, btp->name()));
672 array->set_is_dap4(true);
673 array->add_var_nocopy(btp);
674
675 // The SAX parser set up the parse of attributes here. For the thin DMR, we won't
676 // parse those from the DMR now. jhrg 10/21/21
677
678 // Now grab the dimension elements
679 for (auto child = var_node.first_child(); child; child = child.next_sibling()) {
680 if (is_eq(child.name(), "Dim")) {
681 process_dim(dmr, group, array, child);
682 }
683 else if (is_eq(child.name(), "Map")) {
684 process_map(dmr, group, array, child);
685 }
686 else if (is_eq(child.name(), DMRPP_FIXED_LENGTH_STRING_ARRAY_ELEMENT)) {
687 BESDEBUG(PARSER, prolog << "Variable has been marked with a " << DMRPP_FIXED_LENGTH_STRING_ARRAY_ELEMENT << endl);
688 // <dmrpp:FixedLengthStringArray string_length="8" pad="null"/>
689 array->set_is_flsa(true);
690 for (xml_attribute attr = child.first_attribute(); attr; attr = attr.next_attribute()) {
691 if (is_eq(attr.name(), DMRPP_FIXED_LENGTH_STRING_LENGTH_ATTR)) {
692 auto length = array->set_fixed_string_length(attr.value());
693 BESDEBUG(PARSER, prolog << "Fixed length string array string length: " << length << endl);
694 }
695 else if (is_eq(attr.name(), DMRPP_FIXED_LENGTH_STRING_PAD_ATTR)) {
696 string_pad_type pad = array->set_fixed_length_string_pad_type(attr.value());
697 BESDEBUG(PARSER, prolog << "Fixed length string array padding scheme: " << pad << " (" <<
698 array->get_fixed_length_string_pad_str() << ")" << endl);
699 }
700 }
701 }
702 else if(is_eq(child.name(), DMRPP_VLSA_ELEMENT)){
703 BESDEBUG(PARSER, prolog << "Variable has been marked with a " << DMRPP_VLSA_ELEMENT << endl);
704 array->set_is_vlsa(true);
705 }
706 }
707
708 if (parent)
709 parent->add_var_nocopy(array);
710 else
711 group->add_var_nocopy(array);
712
713 return array;
714}
715
724void DMZ::process_group(DMR *dmr, D4Group *parent, const xml_node &var_node)
725{
726 string name_value;
727 for (xml_attribute attr = var_node.first_attribute(); attr; attr = attr.next_attribute()) {
728 if (is_eq(attr.name(), "name")) {
729 name_value = attr.value();
730 }
731 }
732
733 if (name_value.empty())
734 throw BESInternalError("The required attribute 'name' was missing from a Group element.", __FILE__, __LINE__);
735
736 BaseType *btp = dmr->factory()->NewVariable(dods_group_c, name_value);
737 if (!btp)
738 throw BESInternalError("Could not instantiate the Group '" + name_value + "'.", __FILE__, __LINE__);
739
740 auto new_group = dynamic_cast<DmrppD4Group*>(btp);
741
742 // Need to set this to get the D4Attribute behavior in the type classes
743 // shared between DAP2 and DAP4. jhrg 4/18/13
744 new_group->set_is_dap4(true);
745
746 // link it up and change the current group
747 new_group->set_parent(parent);
748 parent->add_group_nocopy(new_group);
749
750 // Save the xml_node so that we can later find unprocessed XML without searching
751 new_group->set_xml_node(var_node);
752
753 // Now parse all the child nodes of the Group.
754 // NB: this is the same block of code as in build_thin_dmr(); refactor. jhrg 10/21/21
755 for (auto child = var_node.first_child(); child; child = child.next_sibling()) {
756 if (is_eq(child.name(), "Dimension")) {
757 process_dimension(new_group, child);
758 }
759 else if (is_eq(child.name(), "Group")) {
760 process_group(dmr, new_group, child);
761 }
762 else if (member_of(variable_elements, child.name())) {
763 process_variable(dmr, new_group, nullptr, child);
764 }
765 }
766}
767
774{
775 auto xml_root_node = d_xml_doc.first_child();
776
777 process_dataset(dmr, xml_root_node);
778
779 auto root_group = dmr->root();
780
781 auto *dg = dynamic_cast<DmrppD4Group*>(root_group);
782 if (!dg)
783 throw BESInternalError("Expected the root group to also be an instance of DmrppD4Group.", __FILE__, __LINE__);
784
785 dg->set_xml_node(xml_root_node);
786
787 for (auto child = xml_root_node.first_child(); child; child = child.next_sibling()) {
788 if (is_eq(child.name(), "Dimension")) {
789 process_dimension(dg, child);
790 }
791 else if (is_eq(child.name(), "Group")) {
792 process_group(dmr, dg, child);
793 }
794 // TODO Add EnumDef
795 else if (member_of(variable_elements, child.name())) {
796 process_variable(dmr, dg, nullptr, child);
797 }
798 }
799}
800
801// This method will check if any variable in this file can apply the direct IO feature.
802// If there is none,a global dio flag will be set to false. By checking the global flag,
803// the fileout netCDF module may not need to check every variable in the file to see if
804// the direct IO can be applied.
805bool DMZ::set_up_all_direct_io_flags_phase_1(DMR *dmr) {
806
807 if (d_xml_doc == nullptr){
808 throw BESInternalError(prolog + "Received a null DMR pointer.", __FILE__, __LINE__);
809 }
810
811 bool dio_flag_value = set_up_direct_io_flag_phase_1(dmr->root());
812
813 dmr->set_global_dio_flag(dio_flag_value);
814 return dio_flag_value;
815
816}
817
818bool DMZ::set_up_direct_io_flag_phase_1(D4Group *group) {
819
820 bool ret_value = false;
821 for (auto i = group->var_begin(), e = group->var_end(); i != e; ++i) {
822 BESDEBUG("dmrpp","Inside set_up_direct_io_flag: var name is "<<(*i)->name()<<endl);
823 if ((*i)->type() == dods_array_c) {
824 if (true == set_up_direct_io_flag_phase_1(*i)) {
825 ret_value = true;
826 break;
827 }
828 }
829 }
830
831 if (ret_value == false) {
832 for (auto gi = group->grp_begin(), ge = group->grp_end(); gi != ge; ++gi) {
833 if (true == set_up_direct_io_flag_phase_1(*gi)) {
834 ret_value = true;
835 break;
836 }
837 }
838 }
839 return ret_value;
840
841}
842
843bool DMZ::set_up_direct_io_flag_phase_1(BaseType *btp) {
844
845 // goto the DOM tree node for this variable
846 xml_node var_node = get_variable_xml_node(btp);
847 if (var_node == nullptr)
848 throw BESInternalError("Could not find location of variable in the DMR++ XML document.", __FILE__, __LINE__);
849
850 auto chunks = var_node.child("dmrpp:chunks");
851 if(!chunks)
852 return false;
853
854 bool ret_value = false;
855 for (xml_attribute attr = chunks.first_attribute(); attr; attr = attr.next_attribute()) {
856 if (is_eq(attr.name(), "deflateLevel")) {
857 ret_value = true;
858 break;
859 }
860 }
861 return ret_value;
862}
863
864void DMZ::set_up_all_direct_io_flags_phase_2(DMR *dmr) {
865
866 if (d_xml_doc == nullptr){
867 throw BESInternalError(prolog + "Received a null DMR pointer.", __FILE__, __LINE__);
868 }
869
870 set_up_direct_io_flag_phase_2(dmr->root());
871
872}
873
874void DMZ::set_up_direct_io_flag_phase_2(D4Group *group) {
875
876 for (auto i = group->var_begin(), e = group->var_end(); i != e; ++i) {
877 if ((*i)->type() == dods_array_c)
878 set_up_direct_io_flag_phase_2((*i));
879 }
880
881 for (auto gi = group->grp_begin(), ge = group->grp_end(); gi != ge; ++gi)
882 set_up_direct_io_flag_phase_2((*gi));
883
884}
885
886void DMZ::set_up_direct_io_flag_phase_2(BaseType *btp) {
887
888 bool is_integer_float = false;
889 Array *t_a = nullptr;
890
891 Type t = btp->type();
892 if (t == dods_array_c) {
893 t_a=dynamic_cast<Array *>(btp);
894 Type t_var = t_a->var()->type();
895 if (libdap::is_simple_type(t_var) && t_var != dods_str_c && t_var != dods_url_c && t_var!= dods_enum_c && t_var!=dods_opaque_c)
896 is_integer_float = true;
897 }
898
899 // If the var is not an integer or float array, don't support the direct IO.
900 if (is_integer_float == false)
901 return;
902
903
904 // goto the DOM tree node for this variable
905 xml_node var_node = get_variable_xml_node(btp);
906 if (var_node == nullptr)
907 throw BESInternalError("Could not find location of variable in the DMR++ XML document.", __FILE__, __LINE__);
908
909 auto chunks = var_node.child("dmrpp:chunks");
910
911 // No chunks,no need to check the rest.
912 if(!chunks)
913 return;
914
915
916 bool has_deflate_filter = false;
917 string filter;
918 vector<unsigned int>deflate_levels;
919
920 bool is_le = false;
921
922 for (xml_attribute attr = chunks.first_attribute(); attr; attr = attr.next_attribute()) {
923 if (!has_deflate_filter && is_eq(attr.name(), "compressionType")) {
924 filter = attr.value();
925 if (filter.find("deflate") == string::npos)
926 break;
927 else
928 has_deflate_filter = true;
929 }
930 else if (has_deflate_filter && deflate_levels.empty()) {
931
932 if (is_eq(attr.name(), "deflateLevel")) {
933
934 string def_lev_str = attr.value();
935
936 // decompose the string.
937 vector<string> def_lev_str_vec = BESUtil::split(def_lev_str, ' ' );
938 for (const auto &def_lev:def_lev_str_vec)
939 deflate_levels.push_back(stoul(def_lev));
940 }
941
942 }
943 else if (is_eq(attr.name(),"byteOrder")) {
944 string endian_str = attr.value();
945 if (endian_str=="LE")
946 is_le = true;
947 }
948
949 else if (is_eq(attr.name(), "DIO") && is_eq(attr.value(),"off")) {
950 dc(btp)->set_disable_dio(true);
951 BESDEBUG(PARSER, prolog << "direct IO is disabled : the variable name is: " <<btp->name() << endl);
952 }
953 }
954
955 // If no deflate filter is used or the deflate_levels is not defined, cannot do the direct IO. return.
956 if (!has_deflate_filter || (deflate_levels.empty()))
957 return;
958
959 // If the datatype is not little-endian, cannot do the direct IO. return.
960 // The big-endian IEEE-floating-point data also needs byteswap. So we cannot do direct IO. KY 2024-03-03
961 if (!is_le)
962 return;
963
964 if (dc(btp)->is_disable_dio())
965 return;
966#if 0
967 // If the datatype is integer and this is not little-endian, cannot do the direct IO. return.
968 if (!is_le && is_integer_type(t_a->var()->type()))
969 return;
970#endif
971
972 // Now we need to read the first child of dmrpp:chunks to obtain the chunk sizes.
973 vector<unsigned long long>chunk_dim_sizes;
974 for (auto child = chunks.child("dmrpp:chunkDimensionSizes"); child; child = child.next_sibling()) {
975 if (is_eq(child.name(), "dmrpp:chunkDimensionSizes")) {
976 string chunk_sizes_str = child.child_value();
977 vector<string> chunk_sizes_str_vec = BESUtil::split(chunk_sizes_str, ' ' );
978 for (const auto &chunk_size:chunk_sizes_str_vec)
979 chunk_dim_sizes.push_back(stoull(chunk_size));
980 break;
981 }
982 }
983
984 // Since the deflate filter is always associated with the chunk storage,
985 // the chunkDimensionSizes should always exist for the direct IO case. If not, return.
986 if (chunk_dim_sizes.empty())
987 return;
988
989 // Now we need to count the number of children with the name <dmrpp:chunk> inside the <dmrpp:chunks>.
990 size_t num_chunks_children = 0;
991 for (auto child = chunks.first_child(); child; child = child.next_sibling())
992 num_chunks_children++;
993
994 // If the only child is dmrpp::chunkDimensionSizes, no chunk is found. This is not direct IO case.
995 if (num_chunks_children == 1)
996 return;
997
998 // Now we need to check the special case if the chunk size is greater than the dimension size for any dimension.
999 // If this is the case, we will not use the direct chunk IO since netCDF-4 doesn't allow this.
1000 // TODO later, if the dimension is unlimited, this restriction can be lifted. Current dmrpp doesn't store the
1001 // unlimited dimension information.
1002 vector <unsigned long long>dim_sizes;
1003 Array::Dim_iter p = t_a->dim_begin();
1004 while (p != t_a->dim_end()) {
1005 dim_sizes.push_back((unsigned long long)(t_a->dimension_size_ll(p)));
1006 p++;
1007 }
1008
1009 bool chunk_less_dim = true;
1010 if (chunk_dim_sizes.size() == dim_sizes.size()) {
1011 for (unsigned int i = 0; i<dim_sizes.size(); i++) {
1012 if (chunk_dim_sizes[i] > dim_sizes[i]) {
1013 chunk_less_dim = false;
1014 break;
1015 }
1016 }
1017 }
1018 else
1019 chunk_less_dim = false;
1020
1021 if (!chunk_less_dim)
1022 return;
1023
1024 // Another special case is that some chunks are only filled with the fvalues. This case cannot be handled by direct IO.
1025 // First calculate the number of logical chunks.
1026 // Also up to this step, the size of chunk_dim_sizes must be the same as the size of dim_sizes. No need to double check.
1027 size_t num_logical_chunks = 1;
1028 for (unsigned int i = 0; i<dim_sizes.size(); i++)
1029 num_logical_chunks *=(size_t)ceil((float)dim_sizes[i] / (float)chunk_dim_sizes[i]);
1030 if (num_logical_chunks != (num_chunks_children-1))
1031 return;
1032
1033 // Now we should provide the variable info for the define mode inside the fileout netCDF module.
1034 // The chunk offset/length etc. information will be provided after load_chunk() is called in the read().
1035
1036 BESDEBUG(PARSER, prolog << "Can do direct IO: the variable name is: " <<btp->name() << endl);
1037
1038 // Adding the dio information in the variable level. This information is needed for the define mode in the fileout netcdf module.
1039 // Fill in the chunk information so that the fileout netcdf can retrieve.
1040 Array::var_storage_info dmrpp_vs_info;
1041
1042 // Add the filter info.
1043 dmrpp_vs_info.filter = filter;
1044
1045 // Provide the deflate compression levels.
1046 for (const auto &def_lev:deflate_levels)
1047 dmrpp_vs_info.deflate_levels.push_back(def_lev);
1048
1049 // Provide the chunk dimension sizes.
1050 for (const auto &chunk_dim:chunk_dim_sizes)
1051 dmrpp_vs_info.chunk_dims.push_back(chunk_dim);
1052
1053 t_a->set_var_storage_info(dmrpp_vs_info);
1054 t_a->set_dio_flag();
1055
1056}
1057
1058
1059
1068void DMZ::process_attribute(D4Attributes *attributes, const xml_node &dap_attr_node)
1069{
1070 string name_value;
1071 string type_value;
1072 for (xml_attribute attr = dap_attr_node.first_attribute(); attr; attr = attr.next_attribute()) {
1073 if (is_eq(attr.name(), "name")) {
1074 name_value = attr.value();
1075 }
1076 if (is_eq(attr.name(), "type")) {
1077 type_value = attr.value();
1078 }
1079 }
1080
1081 if (name_value.empty() || type_value.empty())
1082 throw BESInternalError("The required attribute 'name' or 'type' was missing from an Attribute element.", __FILE__, __LINE__);
1083
1084 if (type_value == "Container") {
1085 // Make the new attribute container and add it to current container
1086 auto *dap_attr_cont = new D4Attribute(name_value, attr_container_c);
1087 attributes->add_attribute_nocopy(dap_attr_cont);
1088 // In this call, 'attributes()' will allocate the D4Attributes object
1089 // that will hold the container's attributes.
1090 // Test to see if there really are child "Attribute" nodes - empty containers
1091 // are allowed. jhrg 11/4/21
1092 if (dap_attr_node.first_child()) {
1093 for (auto attr_node: dap_attr_node.children("Attribute")) {
1094 process_attribute(dap_attr_cont->attributes(), attr_node);
1095 }
1096 }
1097 }
1098 else if (type_value == "OtherXML") {
1099 // TODO Add support for OtherXML
1100 }
1101 else {
1102 // Make the D4Attribute and add it to the D4Attributes attribute container
1103 auto *attribute = new D4Attribute(name_value, StringToD4AttributeType(type_value));
1104 attributes->add_attribute_nocopy(attribute);
1105 // Process one or more Value elements
1106 for (auto value_elem = dap_attr_node.first_child(); value_elem; value_elem = value_elem.next_sibling()) {
1107 if (is_eq(value_elem.name(), "Value")) {
1108 attribute->add_value(value_elem.child_value()); // returns the text of the first data node
1109 }
1110 }
1111 }
1112}
1113
1131void DMZ::build_basetype_chain(BaseType *btp, stack<BaseType*> &bt)
1132{
1133 auto parent = btp->get_parent();
1134 bt.push(btp);
1135
1136 // The parent must be non-null and not the root group (the root group has no parent).
1137 if (parent && !(parent->type() == dods_group_c && parent->get_parent() == nullptr))
1138 build_basetype_chain(parent, bt);
1139}
1140
1141xml_node DMZ::get_variable_xml_node_helper(const xml_node &/*parent_node*/, stack<BaseType*> &/*bt*/)
1142{
1143#if !USE_CACHED_XML_NODE
1144 // When we have an array of Structure or Sequence, both the Array and the
1145 // Structure BaseType are pushed on the stack. This happens because, for
1146 // constructors, other variables reference them as a parent node (while that's
1147 // not the case for the cardinal types held by an array). Here we pop the
1148 // Array off the stack. A better solution might be to better control what gets
1149 // pushed by build_basetype_chain(). jhrg 10/24/21
1150 if (bt.top()->type() == dods_array_c && bt.top()->var()->is_constructor_type())
1151 bt.pop();
1152
1153 // The DMR XML stores both scalar and array variables using XML elements
1154 // named for the cardinal type. For an array, that is the type of the
1155 // element, so we use BaseType->var()->type_name() for an Array.
1156 string type_name = bt.top()->type() == dods_array_c ? bt.top()->var()->type_name(): bt.top()->type_name();
1157 string var_name = bt.top()->name();
1158 bt.pop();
1159
1160 // Now look for the node with the correct element type and matching name
1161 for (auto node = parent_node.child(type_name.c_str()); node; node = node.next_sibling()) {
1162 for (xml_attribute attr = node.first_attribute(); attr; attr = attr.next_attribute()) {
1163 if (is_eq(attr.name(), "name") && is_eq(attr.value(), var_name.c_str())) {
1164 // if this is the last BaseType on the stack, return the node
1165 if (bt.empty())
1166 return node;
1167 else
1168 return get_variable_xml_node_helper(node, bt);
1169 }
1170 }
1171 }
1172
1173 return xml_node(); // return an empty node
1174#else
1175 return xml_node(); // return an empty node
1176#endif
1177}
1178
1185xml_node DMZ::get_variable_xml_node(BaseType *btp)
1186{
1187#if USE_CACHED_XML_NODE
1188 auto node = dc(btp)->get_xml_node();
1189 if (node == nullptr)
1190 throw BESInternalError(string("The xml_node for '").append(btp->name()).append("' was not recorded."), __FILE__, __LINE__);
1191
1192 return node;
1193#else
1194 // load the BaseType objects onto a stack, since we start at the leaf and
1195 // go backward using its 'parent' pointer, the order of BaseTypes on the
1196 // stack will match the order in the hierarchy of the DOM tree.
1197 stack<BaseType*> bt;
1198 build_basetype_chain(btp, bt);
1199
1200 xml_node dataset = d_xml_doc.first_child();
1201 if (!dataset || !is_eq(dataset.name(), "Dataset"))
1202 throw BESInternalError("No DMR++ has been parsed.", __FILE__, __LINE__);
1203
1204 auto node = get_variable_xml_node_helper(dataset, bt);
1205 return node;
1206#endif
1207}
1208
1214
1226void
1227DMZ::load_attributes(BaseType *btp)
1228{
1229 if (dc(btp)->get_attributes_loaded())
1230 return;
1231
1232 load_attributes(btp, get_variable_xml_node(btp));
1233
1234 // TODO Remove redundant
1235 dc(btp)->set_attributes_loaded(true);
1236
1237 switch (btp->type()) {
1238 // When we load attributes for an Array, the set_send_p() method
1239 // is called for its 'template' variable, but that call fails (and
1240 // the attributes are already loaded). This block marks the attributes
1241 // as loaded so the 'var_node == nullptr' exception above does not
1242 // get thrown. Maybe a better fix would be to mark 'child variables'
1243 // as having their attributes loaded. jhrg 11/16/21
1244 case dods_array_c: {
1245 dc(btp->var())->set_attributes_loaded(true);
1246 break;
1247 }
1248
1249 // FIXME There are no tests for this code. The above block for Array
1250 // was needed, so it seems likely that this will be too, but ...
1251 // jhrg 11/16/21
1252 case dods_structure_c:
1253 case dods_sequence_c:
1254 case dods_grid_c: {
1255 auto *c = dynamic_cast<Constructor*>(btp);
1256 if (c) {
1257 for (auto i = c->var_begin(), e = c->var_end(); i != e; i++) {
1258 if ((*i)->type() == dods_array_c)
1259 dc((*i)->var())->set_attributes_loaded(true);
1260 else
1261 dc(*i)->set_attributes_loaded(true);
1262 }
1263 break;
1264 }
1265 }
1266
1267 default:
1268 break;
1269 }
1270}
1271
1277void
1278DMZ::load_attributes(BaseType *btp, xml_node var_node) const
1279{
1280 if (dc(btp)->get_attributes_loaded())
1281 return;
1282
1283 // Attributes for this node will be held in the var_node siblings.
1284 // NB: Make an explict call to the BaseType implementation in case
1285 // the attributes() method is specialized for this DMR++ code to
1286 // trigger a lazy-load of the variables' attributes. jhrg 10/24/21
1287 // Could also use BaseType::set_attributes(). jhrg
1288 auto attributes = btp->BaseType::attributes();
1289 for (auto child = var_node.first_child(); child; child = child.next_sibling()) {
1290 if (is_eq(child.name(), "Attribute")) {
1291 process_attribute(attributes, child);
1292 }
1293 }
1294
1295 dc(btp)->set_attributes_loaded(true);
1296}
1297
1302void
1303DMZ::load_attributes(Constructor *constructor)
1304{
1305 load_attributes(constructor, get_variable_xml_node(constructor));
1306 for (auto i = constructor->var_begin(), e = constructor->var_end(); i != e; ++i) {
1307 // Groups are not allowed inside a Constructor
1308 if((*i)->type() == dods_group_c){
1309 throw BESInternalError(
1310 prolog + "Found a Group as a member of a " + constructor->type_name() + " data type. " +
1311 "This violates the DAP4 data model and cannot be processed!", __FILE__, __LINE__);
1312 }
1313 load_attributes(*i);
1314 }
1315}
1316
1317void
1318DMZ::load_attributes(D4Group *group) {
1319 // The root group is special; look for its DAP Attributes in the Dataset element
1320 if (group->get_parent() == nullptr) {
1321 xml_node dataset = d_xml_doc.child("Dataset");
1322 if (!dataset)
1323 throw BESInternalError("Could not find the 'Dataset' element in the DMR++ XML document.", __FILE__, __LINE__);
1324 load_attributes(group, dataset);
1325 }
1326 else {
1327 load_attributes(group, get_variable_xml_node(group));
1328 }
1329
1330 for (auto i = group->var_begin(), e = group->var_end(); i != e; ++i) {
1331 // Even though is_constructor_type() returns true for instances of D4Group,
1332 // Groups are kept under a separate container from variables because they
1333 // have a different function than the Structure and Sequence types (Groups
1334 // never hold data).
1335 if((*i)->type() == dods_group_c){
1336 throw BESInternalError(
1337 prolog + "Found a Group instance in the variables collection for Group " + group->name() + ". " +
1338 "This violates the DAP4 data model and cannot be processed!", __FILE__, __LINE__);
1339 }
1340 load_attributes(*i);
1341 }
1342
1343 for (auto i = group->grp_begin(), e = group->grp_end(); i != e; ++i) {
1344 load_attributes(*i);
1345 }
1346}
1347
1348void DMZ::load_all_attributes(libdap::DMR *dmr)
1349{
1350 if(d_xml_doc == nullptr){
1351 throw BESInternalError(prolog + "Received a null DMR pointer.", __FILE__, __LINE__);
1352 }
1353 load_attributes(dmr->root());
1354}
1355
1357
1362
1372void
1373DMZ::process_compact(BaseType *btp, const xml_node &compact)
1374{
1375
1376 dc(btp)->set_compact(true);
1377
1378 auto char_data = compact.child_value();
1379 if (!char_data)
1380 throw BESInternalError("The dmrpp::compact is missing data values.",__FILE__,__LINE__);
1381
1382 std::vector <u_int8_t> decoded = base64::Base64::decode(char_data);
1383
1384 // Current not support structure, sequence and grid for compact storage.
1385 if (btp->type()== dods_structure_c || btp->type() == dods_sequence_c || btp->type() == dods_grid_c)
1386 throw BESInternalError("The dmrpp::compact element must be the child of an array or a scalar variable", __FILE__, __LINE__);
1387
1388 // Obtain the datatype for array and scalar.
1389 Type dtype =btp->type();
1390 bool is_array_subset = false;
1391 if (dtype == dods_array_c) {
1392 auto *da = dynamic_cast<DmrppArray *>(btp);
1393 if (da->is_projected())
1394 is_array_subset = true;
1395 else
1396 dtype = btp->var()->type();
1397 }
1398
1399 if (is_array_subset) {
1400 auto *da = dynamic_cast<DmrppArray *>(btp);
1401 process_compact_subset(da,decoded);
1402 return;
1403 }
1404
1405 switch (dtype) {
1406 case dods_array_c:
1407 throw BESInternalError("DMR++ document fail: An Array may not be the template for an Array.", __FILE__, __LINE__);
1408
1409 case dods_byte_c:
1410 case dods_char_c:
1411 case dods_int8_c:
1412 case dods_uint8_c:
1413 case dods_int16_c:
1414 case dods_uint16_c:
1415 case dods_int32_c:
1416 case dods_uint32_c:
1417 case dods_int64_c:
1418 case dods_uint64_c:
1419
1420 case dods_enum_c:
1421
1422 case dods_float32_c:
1423 case dods_float64_c:
1424 btp->val2buf(reinterpret_cast<void *>(decoded.data()));
1425 btp->set_read_p(true);
1426 break;
1427
1428 case dods_str_c:
1429 case dods_url_c: {
1430
1431 std::string str(decoded.begin(), decoded.end());
1432 if (btp->type() == dods_array_c) {
1433 auto *array = dynamic_cast<DmrppArray *>(btp);
1434 if(!array){
1435 throw BESInternalError("Internal state error. Object claims to be array but is not.",__FILE__,__LINE__);
1436 }
1437 if(array->is_flsa()){
1438 // It's an array of Fixed Length Strings
1439 auto fls_length = array->get_fixed_string_length();
1440 auto pad_type = array->get_fixed_length_string_pad();
1441 auto str_start = reinterpret_cast<char *>(decoded.data());
1442 vector<string> fls_values;
1443 while(fls_values.size() < btp->length_ll()){
1444 string aValue = DmrppArray::ingest_fixed_length_string(str_start,fls_length, pad_type);
1445 fls_values.emplace_back(aValue);
1446 str_start += fls_length;
1447 }
1448 array->set_value(fls_values, (int) fls_values.size());
1449 array->set_read_p(true);
1450 }
1451 else {
1452 // It's an array of Variable Length Strings
1453 throw BESInternalError("Variable Length Strings are not yet supported.",__FILE__,__LINE__);
1454 }
1455 }
1456 else {// Scalar
1457 if(btp->type() == dods_str_c) {
1458 auto *st = static_cast<DmrppStr *>(btp);
1459 st->val2buf(&str);
1460 st->set_read_p(true);
1461 }
1462 else {
1463 auto *st = static_cast<DmrppUrl *>(btp);
1464 st->val2buf(&str);
1465 st->set_read_p(true);
1466 }
1467
1468 }
1469 break;
1470 }
1471
1472 default:
1473 throw BESInternalError("Unsupported COMPACT storage variable type in the drmpp handler.", __FILE__, __LINE__);
1474 case dods_null_c:
1475 break;
1476 case dods_structure_c:
1477 break;
1478 case dods_sequence_c:
1479 break;
1480 case dods_grid_c:
1481 break;
1482 case dods_opaque_c:
1483 break;
1484 case dods_group_c:
1485 break;
1486 }
1487}
1488
1489void DMZ::process_compact_subset(DmrppArray *da, std::vector<u_int8_t> &decoded) {
1490
1491 if (da->var()->type() == dods_str_c || da->var()->type() == dods_url_c)
1492 throw BESInternalError("Currently we don't support the subset for the compacted array of string",__FILE__,__LINE__);
1493
1494 int64_t num_buf_bytes = da->width_ll(true);
1495 vector<unsigned char> buf_bytes;
1496 buf_bytes.resize(num_buf_bytes);
1497 vector<unsigned long long> da_dims = da->get_shape(false);
1498 unsigned long subset_index = 0;
1499 vector<unsigned long long> subset_pos;
1500 handle_subset(da,da->dim_begin(),subset_index, subset_pos,buf_bytes,decoded);
1501
1502 da->val2buf(reinterpret_cast<void *>(buf_bytes.data()));
1503
1504 da->set_read_p(true);
1505}
1506
1507void DMZ::process_vlsa(libdap::BaseType *btp, const pugi::xml_node &vlsa_element)
1508{
1509 //---------------------------------------------------------------------------
1510 // Input Sanitization
1511 // We do the QC here and not in all the functions called, like endlessly...
1512 //
1513 if (btp->type() != dods_array_c) {
1514 throw BESInternalError(prolog + "Received an unexpected "+ btp->type_name() +
1515 " Expected an instance of DmrppArray!", __FILE__, __LINE__);
1516 }
1517 auto *array = dynamic_cast<DmrppArray *>(btp);
1518 if (!array) {
1519 throw BESInternalError("Internal state error. "
1520 "Object claims to be array but is not.", __FILE__, __LINE__);
1521 }
1522 if(array->var()->type() != dods_str_c && array->var()->type() != dods_url_c){
1523 throw BESInternalError(prolog + "Internal state error. "
1524 "Expected array of dods_str_c, got " +
1525 array->var()->type_name(), __FILE__, __LINE__);
1526 }
1527
1528 vector<string>entries;
1529 vlsa::read(vlsa_element, entries);
1530
1531 array->set_is_vlsa(true);
1532 array->set_value(entries, (int) entries.size());
1533 array->set_read_p(true);
1534}
1535
1536void
1537DMZ::process_missing_data(BaseType *btp, const xml_node &missing_data)
1538{
1539 BESDEBUG(PARSER, prolog << "Coming to process_missing_data() " << endl);
1540 dc(btp)->set_missing_data(true);
1541
1542 auto char_data = missing_data.child_value();
1543 if (!char_data)
1544 throw BESInternalError("The dmrpp::missing_data doesn't contain missing data values.",__FILE__,__LINE__);
1545
1546 std::vector <u_int8_t> decoded = base64::Base64::decode(char_data);
1547
1548 if (btp->type() != dods_array_c && btp->type() !=dods_byte_c)
1549 throw BESInternalError("The dmrpp::missing_data element must be the child of an array or a unsigned char scalar variable", __FILE__, __LINE__);
1550
1551 if (btp->type() == dods_byte_c) {
1552 auto db = dynamic_cast<DmrppByte *>(btp);
1553 db->set_value(decoded[0]);
1554 db->set_read_p(true);
1555 return;
1556 }
1557 auto *da = dynamic_cast<DmrppArray *>(btp);
1558
1559 vector<Bytef> result_bytes;
1560
1561 // We need to obtain the total buffer size to retrieve the whole array.
1562 // We cannot use width_ll() since it will return the number of selected elements.
1563 auto result_size = (uLongf)(da->get_size(false) *da->prototype()->width());
1564 result_bytes.resize(result_size);
1565
1566 if (da->get_size(false) == 1)
1567 memcpy(result_bytes.data(),decoded.data(),result_size);
1568 else {
1569 int retval = uncompress(result_bytes.data(), &result_size, decoded.data(), decoded.size());
1570 if (retval != 0)
1571 throw BESInternalError("The dmrpp::missing_data - fail to uncompress the mssing data.", __FILE__, __LINE__);
1572 }
1573
1574 if (da->is_projected()) {
1575
1576 int64_t num_buf_bytes = da->width_ll(true);
1577 vector<unsigned char> buf_bytes;
1578 buf_bytes.resize(num_buf_bytes);
1579 vector<unsigned long long> da_dims = da->get_shape(false);
1580 unsigned long subset_index = 0;
1581 vector<unsigned long long> subset_pos;
1582 handle_subset(da,da->dim_begin(),subset_index, subset_pos,buf_bytes,result_bytes);
1583
1584 da->val2buf(reinterpret_cast<void *>(buf_bytes.data()));
1585
1586 }
1587 else
1588 da->val2buf(reinterpret_cast<void *>(result_bytes.data()));
1589
1590 da->set_read_p(true);
1591
1592}
1593
1594bool
1595DMZ::supported_special_structure_type_internal(Constructor *var_ctor) {
1596
1597 bool ret_value = true;
1598 Constructor::Vars_iter vi = var_ctor->var_begin();
1599 Constructor::Vars_iter ve = var_ctor->var_end();
1600 for (; vi != ve; vi++) {
1601
1602 BaseType *bt = *vi;
1603 Type t_bt = bt->type();
1604
1605 // Only support array or scalar of float/int/string.
1606 if (libdap::is_simple_type(t_bt) == false) {
1607
1608 if (t_bt != dods_array_c) {
1609 ret_value = false;
1610 break;
1611 }
1612 else {
1613 auto t_a = dynamic_cast<Array *>(bt);
1614 Type t_array_var = t_a->var()->type();
1615 if (!libdap::is_simple_type(t_array_var) || t_array_var == dods_url_c || t_array_var == dods_enum_c || t_array_var==dods_opaque_c) {
1616 ret_value = false;
1617 break;
1618 }
1619 }
1620 }
1621 else if (t_bt == dods_url_c || t_bt == dods_enum_c || t_bt==dods_opaque_c) {
1622 ret_value = false;
1623 break;
1624 }
1625 }
1626
1627 return ret_value;
1628
1629}
1630
1631bool
1632DMZ::supported_special_structure_type(BaseType *btp)
1633{
1634 bool ret_value = false;
1635 Type t = btp->type();
1636 if ((t == dods_array_c && btp->var()->type() == dods_structure_c) || t==dods_structure_c) {
1637 Constructor *var_constructor = nullptr;
1638 if (t==dods_structure_c)
1639 var_constructor = dynamic_cast<Constructor*>(btp);
1640 else
1641 var_constructor = dynamic_cast<Constructor*>(btp->var());
1642 if (!var_constructor){
1643 throw BESInternalError(
1644 prolog + "Failed to cast " + btp->var()->type_name() + " " + btp->name() +
1645 " to an instance of Constructor." , __FILE__, __LINE__);
1646 }
1647
1648 ret_value = supported_special_structure_type_internal(var_constructor);
1649
1650 }
1651 return ret_value;
1652
1653}
1654
1655void
1656DMZ::process_special_structure_data(BaseType *btp, const xml_node &special_structure_data)
1657{
1658 BESDEBUG(PARSER, prolog << "Coming to process_special_structure_data() " << endl);
1659
1660 if (supported_special_structure_type(btp) == false)
1661 throw BESInternalError("The dmrpp::the datatype is not a supported special structure variable", __FILE__, __LINE__);
1662
1663 auto char_data = special_structure_data.child_value();
1664 if (!char_data)
1665 throw BESInternalError("The dmrpp::special_structure_data doesn't contain special structure data values.",__FILE__,__LINE__);
1666
1667 std::vector <u_int8_t> values = base64::Base64::decode(char_data);
1668 size_t total_value_size = values.size();
1669
1670 if(btp->type() == dods_array_c) {
1671
1672 auto ar = dynamic_cast<DmrppArray *>(btp);
1673 if(ar->is_projected())
1674 throw BESInternalError("The dmrpp::currently we don't support subsetting of special_structure_data.",__FILE__,__LINE__);
1675
1676 int64_t nelms = ar->length_ll();
1677 size_t values_offset = 0;
1678
1679 for (int64_t element = 0; element < nelms; ++element) {
1680
1681 auto dmrpp_s = dynamic_cast<DmrppStructure*>(ar->var()->ptr_duplicate());
1682 if(!dmrpp_s)
1683 throw InternalErr(__FILE__, __LINE__, "Cannot obtain the structure pointer.");
1684
1685 process_special_structure_data_internal(dmrpp_s, values, total_value_size, values_offset);
1686 ar->set_vec_ll((uint64_t)element,dmrpp_s);
1687 delete dmrpp_s;
1688 }
1689 }
1690 else {
1691
1692 size_t values_offset = 0;
1693 auto dmrpp_s = dynamic_cast<DmrppStructure*>(btp);
1694 if(!dmrpp_s)
1695 throw InternalErr(__FILE__, __LINE__, "Cannot obtain the structure pointer.");
1696 process_special_structure_data_internal(dmrpp_s, values , total_value_size, values_offset);
1697 }
1698
1699 btp->set_read_p(true);
1700
1701}
1702
1703void DMZ::process_special_structure_data_internal(DmrppStructure * dmrpp_s, std::vector<u_int8_t> &values , size_t total_value_size, size_t & values_offset){
1704
1705 Constructor::Vars_iter vi = dmrpp_s->var_begin();
1706 Constructor::Vars_iter ve = dmrpp_s->var_end();
1707
1708 for (; vi != ve; vi++) {
1709 BaseType *bt = *vi;
1710 Type t_bt = bt->type();
1711 if (libdap::is_simple_type(t_bt) && t_bt != dods_str_c && t_bt != dods_url_c && t_bt!= dods_enum_c && t_bt!=dods_opaque_c) {
1712
1713 BESDEBUG("dmrpp", "var name is: " << bt->name() << "'" << endl);
1714 BESDEBUG("dmrpp", "var values_offset is: " << values_offset << "'" << endl);
1715 bt->val2buf(values.data() + values_offset);
1716 values_offset += bt->width_ll();
1717 }
1718 else if (t_bt == dods_str_c) {
1719 BESDEBUG("dmrpp", "var string name is: " << bt->name() << "'" << endl);
1720 BESDEBUG("dmrpp", "var string values_offset is: " << values_offset << "'" << endl);
1721 if (total_value_size < values_offset)
1722 throw InternalErr(__FILE__, __LINE__, "The offset of the retrieved value is out of the boundary.");
1723 size_t rest_buf_size = total_value_size - values_offset;
1724 u_int8_t* start_pointer = values.data() + values_offset;
1725 vector<char>temp_buf;
1726 temp_buf.resize(rest_buf_size);
1727 memcpy(temp_buf.data(),(void*)start_pointer,rest_buf_size);
1728 // find the index of first ";", the separator
1729 size_t string_stop_index =0;
1730 vector<char> string_value;
1731 for (size_t i = 0; i <rest_buf_size; i++) {
1732 if(temp_buf[i] == ';') {
1733 string_stop_index = i;
1734 break;
1735 }
1736 else
1737 string_value.push_back(temp_buf[i]);
1738 }
1739 string encoded_str(string_value.begin(),string_value.end());
1740 vector <u_int8_t> decoded_str = base64::Base64::decode(encoded_str);
1741 vector <char> decoded_vec;
1742 decoded_vec.resize(decoded_str.size());
1743 memcpy(decoded_vec.data(),(void*)decoded_str.data(),decoded_str.size());
1744 string final_str(decoded_vec.begin(),decoded_vec.end());
1745 bt->val2buf(&final_str);
1746 values_offset = values_offset + string_stop_index+1;
1747 }
1748
1749 else if (t_bt == dods_array_c) {
1750 BESDEBUG("dmrpp", "var array name is: " << bt->name() << "'" << endl);
1751 BESDEBUG("dmrpp", "var array values_offset is: " << values_offset << "'" << endl);
1752
1753 auto t_a = dynamic_cast<Array *>(bt);
1754 Type ar_basetype = t_a->var()->type();
1755 if (libdap::is_simple_type(ar_basetype) && ar_basetype != dods_str_c && ar_basetype != dods_url_c && ar_basetype!= dods_enum_c && ar_basetype!=dods_opaque_c) {
1756 bt->val2buf(values.data() + values_offset);
1757 values_offset += bt->width_ll();
1758 }
1759 else if (ar_basetype == dods_str_c) {
1760
1761 if(total_value_size < values_offset)
1762 throw InternalErr(__FILE__, __LINE__, "The offset of the retrieved value is out of the boundary.");
1763
1764 size_t rest_buf_size = total_value_size - values_offset;
1765 u_int8_t* start_pointer = values.data() + values_offset;
1766 vector<char>temp_buf;
1767 temp_buf.resize(rest_buf_size);
1768 memcpy(temp_buf.data(),(void*)start_pointer,rest_buf_size);
1769
1770 int64_t num_ar_elems = t_a->length_ll();
1771
1772 // We need to create a vector of string to pass the string array.
1773 // Each string's encoded value is separated by ';'.
1774 vector<string> encoded_str;
1775 encoded_str.resize(num_ar_elems);
1776
1777 unsigned int str_index = 0;
1778 size_t string_stop_index = 0;
1779 for (size_t i = 0; i <rest_buf_size; i++) {
1780 if(temp_buf[i] != ';')
1781 encoded_str[str_index].push_back(temp_buf[i]);
1782 else {
1783 str_index++;
1784 if (str_index == num_ar_elems) {
1785 string_stop_index = i;
1786 break;
1787 }
1788 }
1789 }
1790
1791 vector<string> final_str;
1792 final_str.resize(num_ar_elems);
1793
1794 // decode the encoded string
1795 for (size_t i = 0; i <num_ar_elems; i++) {
1796
1797 string temp_encoded_str(encoded_str[i].begin(),encoded_str[i].end());
1798 vector <u_int8_t> decoded_str = base64::Base64::decode(temp_encoded_str);
1799 vector <char> decoded_vec;
1800 decoded_vec.resize(decoded_str.size());
1801 memcpy(decoded_vec.data(),(void*)decoded_str.data(),decoded_str.size());
1802 string temp_final_str(decoded_vec.begin(),decoded_vec.end());
1803 final_str[i] = temp_final_str;
1804 }
1805
1806 t_a->set_value_ll(final_str,num_ar_elems);
1807 values_offset = values_offset + string_stop_index+1;
1808
1809 }
1810 else
1811 throw InternalErr(__FILE__, __LINE__, "The base type of this structure is not integer or float or string. Currently it is not supported.");
1812 }
1813 }
1814 dmrpp_s->set_read_p(true);
1815
1816}
1817
1818
1833void DMZ::process_chunk(DmrppCommon *dc, const xml_node &chunk) const
1834{
1835 string href;
1836 string trust;
1837 string offset;
1838 string size;
1839 string chunk_position_in_array;
1840 string filter_mask;
1841 bool href_trusted = false;
1842
1843 for (xml_attribute attr = chunk.first_attribute(); attr; attr = attr.next_attribute()) {
1844 if (is_eq(attr.name(), "offset")) {
1845 offset = attr.value();
1846 }
1847 else if (is_eq(attr.name(), "nBytes")) {
1848 size = attr.value();
1849 }
1850 else if (is_eq(attr.name(), "chunkPositionInArray")) {
1851 chunk_position_in_array = attr.value();
1852 }
1853 else if (is_eq(attr.name(), "fm")) {
1854 filter_mask = attr.value();
1855 }
1856 else if (is_eq(attr.name(), "href")) {
1857 href = attr.value();
1858 }
1859 else if (is_eq(attr.name(), "trust") || is_eq(attr.name(), "dmrpp:trust")) {
1860 href_trusted = is_eq(attr.value(), "true");
1861 }
1862 }
1863
1864 if (offset.empty() || size.empty())
1865 throw BESInternalError("Both size and offset are required for a chunk node.", __FILE__, __LINE__);
1866 if (!href.empty()) {
1867 shared_ptr<http::url> data_url(new http::url(href, href_trusted));
1868 if (filter_mask.empty())
1869 dc->add_chunk(data_url, dc->get_byte_order(), stoull(size), stoull(offset), chunk_position_in_array);
1870 else
1871 dc->add_chunk(data_url, dc->get_byte_order(), stoull(size), stoull(offset), stoul(filter_mask), chunk_position_in_array);
1872 }
1873 else {
1874 if (filter_mask.empty())
1875 dc->add_chunk(d_dataset_elem_href, dc->get_byte_order(), stoull(size), stoull(offset), chunk_position_in_array);
1876 else
1877 dc->add_chunk(d_dataset_elem_href, dc->get_byte_order(), stoull(size), stoull(offset), stoul(filter_mask), chunk_position_in_array);
1878 }
1879
1880 dc->accumlate_storage_size(stoull(size));
1881}
1882
1883void DMZ::process_block(DmrppCommon *dc, const xml_node &chunk,unsigned int block_count) const
1884{
1885 string href;
1886 string trust;
1887 string offset;
1888 string size;
1889 string chunk_position_in_array;
1890 string filter_mask;
1891 bool href_trusted = false;
1892
1893 for (xml_attribute attr = chunk.first_attribute(); attr; attr = attr.next_attribute()) {
1894 if (is_eq(attr.name(), "offset")) {
1895 offset = attr.value();
1896 }
1897 else if (is_eq(attr.name(), "nBytes")) {
1898 size = attr.value();
1899 }
1900 else if (is_eq(attr.name(), "href")) {
1901 href = attr.value();
1902 }
1903 else if (is_eq(attr.name(), "trust") || is_eq(attr.name(), "dmrpp:trust")) {
1904 href_trusted = is_eq(attr.value(), "true");
1905 }
1906
1907 }
1908
1909 if (offset.empty() || size.empty())
1910 throw BESInternalError("Both size and offset are required for a block node.", __FILE__, __LINE__);
1911 if (!href.empty()) {
1912 shared_ptr<http::url> data_url(new http::url(href, href_trusted));
1913 dc->add_chunk(data_url, dc->get_byte_order(), stoull(size), stoull(offset),true, block_count);
1914 }
1915 else
1916 dc->add_chunk(d_dataset_elem_href, dc->get_byte_order(), stoull(size), stoull(offset), true, block_count);
1917
1918
1919 dc->accumlate_storage_size(stoull(size));
1920}
1921
1928void DMZ::process_cds_node(DmrppCommon *dc, const xml_node &chunks)
1929{
1930 for (auto child = chunks.child("dmrpp:chunkDimensionSizes"); child; child = child.next_sibling()) {
1931 if (is_eq(child.name(), "dmrpp:chunkDimensionSizes")) {
1932 string sizes = child.child_value();
1933 dc->parse_chunk_dimension_sizes(sizes);
1934 }
1935 }
1936}
1937
1938static void add_fill_value_information(DmrppCommon *dc, const string &value_string, libdap::Type fv_type)
1939{
1940 dc->set_fill_value_string(value_string);
1941 dc->set_fill_value_type(fv_type);
1942 dc->set_uses_fill_value(true);
1943 }
1944
1953bool DMZ::process_chunks(BaseType *btp, const xml_node &var_node) const
1954{
1955 auto chunks = var_node.child("dmrpp:chunks");
1956 if(!chunks)
1957 return false;
1958
1959 bool has_fill_value = false;
1960
1961 unsigned int block_count = 0;
1962 bool is_multi_lb_chunks = false;
1963
1964 for (xml_attribute attr = chunks.first_attribute(); attr; attr = attr.next_attribute()) {
1965
1966 if (is_eq(attr.name(), "compressionType")) {
1967 dc(btp)->set_filter(attr.value());
1968 }
1969 else if (is_eq(attr.name(), "deflateLevel")) {
1970 string def_lev_str = attr.value();
1971 // decompose the string.
1972 vector<string> def_lev_str_vec = BESUtil::split(def_lev_str, ' ' );
1973 vector<unsigned int> def_levels;
1974 for (const auto &def_lev:def_lev_str_vec)
1975 def_levels.push_back(stoul(def_lev));
1976 dc(btp)->set_deflate_levels(def_levels);
1977 }
1978 else if (is_eq(attr.name(), "fillValue")) {
1979
1980 // Throws BESInternalError when unsupported types detected.
1981 string unsupported_type;
1982 if(flagged_as_unsupported_type(var_node,unsupported_type)){
1983 stringstream msg;
1984 msg << prolog << "Found a dmrpp:chunk/@fillValue with a value of ";
1985 msg << "'" << unsupported_type << "' this means that ";
1986 msg << "the Hyrax service is unable to process this variable/dataset.";
1987 throw BESInternalError(msg.str(),__FILE__,__LINE__);
1988 }
1989
1990 has_fill_value = true;
1991
1992 // Fill values are only supported for Arrays and scalar numeric datatypes (7/12/22)
1993 if (btp->type()==dods_url_c
1994 || btp->type() == dods_sequence_c || btp->type() == dods_grid_c)
1995 throw BESInternalError("Fill Value chunks are unsupported for URL, sequence and grid types.", __FILE__, __LINE__);
1996
1997 if (btp->type() == dods_structure_c) {
1998 string fvalue_str = attr.value();
1999 }
2000
2001 if (btp->type() == dods_array_c) {
2002 auto array = dynamic_cast<libdap::Array*>(btp);
2003 add_fill_value_information(dc(btp), attr.value(), array->var()->type());
2004 }
2005 else
2006 add_fill_value_information(dc(btp), attr.value(), btp->type());
2007 }
2008 else if (is_eq(attr.name(), "byteOrder"))
2009 dc(btp)->ingest_byte_order(attr.value());
2010
2011 // Here we don't need to check the structOffset attribute if the datatype is not dods_structure_c or array of dods_structure_c.
2012 // But since most variables won't have the structOffset attribute, the code will NOT even go to the following "else if block" after
2013 // looping through the last attribute. So still keep the following implementation.
2014 else if (is_eq(attr.name(), "structOffset")) {
2015 string so_str = attr.value();
2016 // decompose the string.
2017 vector<string> so_str_vec = BESUtil::split(so_str, ' ' );
2018 vector<unsigned int> struct_offsets;
2019 for (const auto &s_off:so_str_vec)
2020 struct_offsets.push_back(stoul(s_off));
2021 dc(btp)->set_struct_offsets(struct_offsets);
2022 }
2023 // The following only applies to rare cases when handling HDF4, most cases won't even come here.
2024 else if (is_eq(attr.name(),"LBChunk")) {
2025 string is_lbchunk_value = attr.value();
2026 if (is_lbchunk_value == "true") {
2027 is_multi_lb_chunks = true;
2028 dc(btp)->set_multi_linked_blocks_chunk(true);
2029 }
2030 }
2031
2032 }
2033
2034 // reset one_chunk_fillvalue to false if has_fill_value = false
2035 if (has_fill_value == false && dc(btp)->get_one_chunk_fill_value() == true) // reset fillvalue
2036 dc(btp)->set_one_chunk_fill_value(false);
2037
2038 // Look for the chunksDimensionSizes element - it will not be present for contiguous data
2039 process_cds_node(dc(btp), chunks);
2040
2041 // If child node "dmrpp:chunk" is found, the child node "dmrpp:block" will be not present.
2042 // They are mutual exclusive.
2043
2044 bool is_chunked_storage = false;
2045 for (auto chunk = chunks.child("dmrpp:chunk"); chunk; chunk = chunk.next_sibling()) {
2046 if (is_eq(chunk.name(), "dmrpp:chunk")) {
2047 is_chunked_storage = true;
2048 break;
2049 }
2050 }
2051
2052 if (is_chunked_storage && is_multi_lb_chunks== false) {
2053 // Chunks for this node will be held in the var_node siblings.
2054 for (auto chunk = chunks.child("dmrpp:chunk"); chunk; chunk = chunk.next_sibling()) {
2055 if (is_eq(chunk.name(), "dmrpp:chunk")) {
2056 process_chunk(dc(btp), chunk);
2057 }
2058 }
2059 }
2060 else {
2061
2062 // Blocks for this node, we need to first check if there is only one block. If this is the case,
2063 // we should issue an error.
2064 for (auto chunk = chunks.child("dmrpp:block"); chunk; chunk = chunk.next_sibling()) {
2065 if (is_eq(chunk.name(), "dmrpp:block")) {
2066 block_count++;
2067 }
2068 if (block_count >1)
2069 break;
2070 }
2071 }
2072 if (block_count > 0) {
2073 if (block_count == 1)
2074 throw BESInternalError(" The number of linked block is 1, but it should be > 1.", __FILE__, __LINE__);
2075 if (block_count >1) {
2076 // set using linked block
2077 dc(btp)->set_using_linked_block();
2078 // reset the count to 0 to process the blocks.
2079 block_count = 0;
2080 for (auto chunk = chunks.child("dmrpp:block"); chunk; chunk = chunk.next_sibling()) {
2081 if (is_eq(chunk.name(), "dmrpp:block")) {
2082 process_block(dc(btp), chunk, block_count);
2083 BESDEBUG(PARSER, prolog << "This count of linked block of this variable is: " << block_count << endl);
2084 block_count++;
2085 }
2086 }
2087 dc(btp)->set_total_linked_blocks(block_count);
2088 }
2089 }
2090 else if (is_multi_lb_chunks) {
2091
2092 queue <vector<pair<unsigned long long,unsigned long long>>> mb_index_queue;
2093 vector<pair<unsigned long long, unsigned long long>> offset_length_pair;
2094
2095 // Loop through all the chunks.
2096 for (auto chunk = chunks.child("dmrpp:chunk"); chunk; chunk = chunk.next_sibling()) {
2097
2098 // Check the block offset and length for this chunk.
2099 if (is_eq(chunk.name(), "dmrpp:chunk"))
2100 add_mblock_index(chunk, mb_index_queue,offset_length_pair);
2101 }
2102 // This is the last one.
2103 mb_index_queue.push(offset_length_pair);
2104
2105 // Now we get all the blocks and we will process them.
2106 for (auto chunk = chunks.child("dmrpp:chunk"); chunk; chunk = chunk.next_sibling()) {
2107 if (is_eq(chunk.name(), "dmrpp:chunk"))
2108 process_multi_blocks_chunk(dc(btp),chunk, mb_index_queue);
2109 }
2110 dc(btp)->set_multi_linked_blocks_chunk(true);
2111
2112 }
2113 return true;
2114
2115}
2116
2117
2123vector<unsigned long long> DMZ::get_array_dims(Array *array)
2124{
2125 vector<unsigned long long> array_dim_sizes;
2126 for (auto i= array->dim_begin(), e = array->dim_end(); i != e; ++i) {
2127 array_dim_sizes.push_back(array->dimension_size_ll(i));
2128 }
2129
2130 return array_dim_sizes;
2131}
2132
2143size_t DMZ::logical_chunks(const vector <unsigned long long> &array_dim_sizes, const DmrppCommon *dc)
2144{
2145 auto const& chunk_dim_sizes = dc->get_chunk_dimension_sizes();
2146 if (chunk_dim_sizes.size() != array_dim_sizes.size()) {
2147 ostringstream oss;
2148 oss << "Expected the chunk and array rank to match (chunk: " << chunk_dim_sizes.size() << ", array: "
2149 << array_dim_sizes.size() << ")";
2150 throw BESInternalError(oss.str(), __FILE__, __LINE__);
2151 }
2152
2153 size_t num_logical_chunks = 1;
2154 auto i = array_dim_sizes.begin();
2155 for (auto chunk_dim_size: chunk_dim_sizes) {
2156 auto array_dim_size = *i++;
2157 num_logical_chunks *= (size_t)ceil((float)array_dim_size / (float)chunk_dim_size);
2158 }
2159
2160 return num_logical_chunks;
2161}
2162
2177set< vector<unsigned long long> > DMZ::get_chunk_map(const vector<shared_ptr<Chunk>> &chunks)
2178{
2179 set< vector<unsigned long long> > chunk_map;
2180 for (auto const &chunk: chunks) {
2181 chunk_map.insert(chunk->get_position_in_array());
2182 }
2183
2184 return chunk_map;
2185}
2186
2195void DMZ::process_fill_value_chunks(BaseType *btp, const set<shape> &chunk_map, const shape &chunk_shape,
2196 const shape &array_shape, unsigned long long chunk_size, unsigned int struct_size)
2197{
2198 auto dcp = dc(btp);
2199 // Use an Odometer to walk over each potential chunk
2200 DmrppChunkOdometer odometer(array_shape, chunk_shape);
2201 do {
2202 const auto &s = odometer.indices();
2203 if (chunk_map.find(s) == chunk_map.end()) {
2204
2205 // Fill Value chunk
2206 // what we need byte order, pia, fill value
2207 // We also need to check the user-defined fill value case.
2208 vector<pair<Type,int>> structure_type_element;
2209 bool ret_value = is_simple_dap_structure_scalar_array(btp,structure_type_element);
2210 if (ret_value) {
2211 if (struct_size !=0)
2212 dcp->add_chunk(dcp->get_byte_order(), dcp->get_fill_value(), dcp->get_fill_value_type(), chunk_size, s, struct_size);
2213 else
2214 dcp->add_chunk(dcp->get_byte_order(), dcp->get_fill_value(), dcp->get_fill_value_type(), chunk_size, s, structure_type_element);
2215 }
2216 else
2217 dcp->add_chunk(dcp->get_byte_order(), dcp->get_fill_value(), dcp->get_fill_value_type(), chunk_size, s);
2218 }
2219 } while (odometer.next());
2220}
2221
2230void DMZ::load_chunks(BaseType *btp)
2231{
2232 if (dc(btp)->get_chunks_loaded())
2233 return;
2234
2235 // goto the DOM tree node for this variable
2236 xml_node var_node = get_variable_xml_node(btp);
2237 if (var_node == nullptr)
2238 throw BESInternalError("Could not find location of variable in the DMR++ XML document.", __FILE__, __LINE__);
2239
2240 // Chunks for this node will be held in the var_node siblings. For a given BaseType, there should
2241 // be only one chunks node xor one chunk node.
2242 int chunks_found = 0;
2243 int chunk_found = 0;
2244 int compact_found = 0;
2245 int vlsa_found = 0;
2246 int missing_data_found = 0;
2247 int special_structure_data_found = 0;
2248
2249 // Chunked data
2250 if (process_chunks(btp, var_node)) {
2251 chunks_found = 1;
2252 BESDEBUG(PARSER, prolog << "This variable's chunks storage size is: " << dc(btp)->get_var_chunks_storage_size() << endl);
2253 auto array = dynamic_cast<Array*>(btp);
2254 // It's possible to have a chunk, but not have a chunk dimension sizes element
2255 // when there is only one chunk (e.g., with HDF5 Contiguous storage). jhrg 5/5/22
2256 if (array && !dc(btp)->get_chunk_dimension_sizes().empty()) {
2257 auto const &array_shape = get_array_dims(array);
2258 size_t num_logical_chunks = logical_chunks(array_shape, dc(btp));
2259 // do we need to run this code?
2260 if (num_logical_chunks != dc(btp)->get_chunks_size()) {
2261 auto const &chunk_map = get_chunk_map(dc(btp)->get_immutable_chunks());
2262 // Since the variable has some chunks that hold only fill values, add those chunks
2263 // to the vector of chunks.
2264 auto const &chunk_shape = dc(btp)->get_chunk_dimension_sizes();
2265 unsigned long long chunk_size_bytes = array->var()->width(); // start with the element size in bytes
2266 vector<unsigned int> s_off = dc(btp)->get_struct_offsets();
2267 if (!s_off.empty())
2268 chunk_size_bytes = s_off.back();
2269
2270 for (auto dim_size: chunk_shape)
2271 chunk_size_bytes *= dim_size;
2272 unsigned int struct_size =(s_off.empty())?0:s_off.back();
2273 process_fill_value_chunks(btp, chunk_map, dc(btp)->get_chunk_dimension_sizes(),
2274 array_shape, chunk_size_bytes,struct_size);
2275 // Now we need to check if this var only contains one chunk.
2276 // If yes, we will go ahead to set one_chunk_fill_value be true.
2277 // While later in process_chunks(), we will check if fillValue is defined and adjust the value.
2278 if (num_logical_chunks == 1)
2279 dc(btp)->set_one_chunk_fill_value(true);
2280 dc(btp)->set_processing_fv_chunks();
2281
2282
2283 }
2284 }
2285 // If both chunks and chunk_dimension_sizes are empty, this is contiguous storage
2286 // with nothing but fill values. Make a single chunk that can hold the fill values.
2287 else if (array && dc(btp)->get_immutable_chunks().empty()) {
2288 auto const &array_shape = get_array_dims(array);
2289
2290 // Position in array is 0, 0, ..., 0 were the number of zeros is the number of array dimensions
2291 shape pia(0,array_shape.size());
2292 auto dcp = dc(btp);
2293
2294 // Since there is one chunk, the chunk size and array size are one and the same.
2295 unsigned long long array_size_bytes = 1;
2296 for (auto dim_size: array_shape)
2297 array_size_bytes *= dim_size;
2298
2299 if (array->var()->type() == dods_str_c) {
2300
2301 size_t str_size = dcp->get_fill_value().size();
2302 string fvalue = dcp->get_fill_value();
2303
2304 // array size above is in _elements_, multiply by the element width to get bytes
2305 // We encounter a special case here. In one NASA file, the fillvalue='\0', so
2306 // when converting to string fillvalue becomes "" and the string size is 0.
2307 // This won't correctly pass the fillvalue buffer downstream. So here we
2308 // change the fillvalue to ' ' so that it can sucessfully generate netCDF file via fileout netcdf.
2309 // Also for this special case, the string length is 1.
2310 // KY 2022-12-22
2311 if(dcp->get_fill_value()=="") {
2312 fvalue =" ";
2313 }
2314 else
2315 array_size_bytes *=str_size;
2316 dcp->add_chunk(dcp->get_byte_order(), fvalue, dcp->get_fill_value_type(), array_size_bytes, pia);
2317 }
2318 else {
2319 array_size_bytes *= array->var()->width();
2320
2321 // We also need to check the user-defined fill value case.
2322 vector<pair<Type,int>> structure_type_element;
2323 bool ret_value = is_simple_dap_structure_scalar_array(btp,structure_type_element);
2324 if (ret_value)
2325 dcp->add_chunk(dcp->get_byte_order(), dcp->get_fill_value(), dcp->get_fill_value_type(), array_size_bytes, pia, structure_type_element);
2326 else
2327 dcp->add_chunk(dcp->get_byte_order(), dcp->get_fill_value(), dcp->get_fill_value_type(), array_size_bytes, pia);
2328 }
2329
2330 }
2331 // This is the case when the scalar variable that holds the fill value with the contiguous storage comes.
2332 // Note we only support numeric datatype now. KY 2022-07-12
2333 else if (btp->type()!=dods_array_c && dc(btp)->get_immutable_chunks().empty()) {
2334 if (btp->type() == dods_grid_c || btp->type() == dods_sequence_c || btp->type() ==dods_url_c) {
2335 ostringstream oss;
2336 oss << " For scalar variable with the contiguous storage that holds the fillvalue, only numeric"
2337 << " types are supported.";
2338 throw BESInternalError(oss.str(), __FILE__, __LINE__);
2339 }
2340 shape pia;
2341 auto dcp = dc(btp);
2342 if (btp->type() == dods_str_c) {
2343
2344 size_t array_size = dcp->get_fill_value().size();
2345 string fvalue = dcp->get_fill_value();
2346
2347 // We encounter a special case here. In one NASA file, the fillvalue='\0', so
2348 // when converting to string fillvalue becomes "" and the string size is 0.
2349 // This won't correctly pass the fillvalue buffer downstream. So here we
2350 // change the fillvalue to ' ' so that it can successfully generate netCDF file via fileout netcdf.
2351 // KY 2022-12-22
2352 if(dcp->get_fill_value()=="") {
2353 fvalue =" ";
2354 array_size = 1;
2355 }
2356 dcp->add_chunk(dcp->get_byte_order(), fvalue, dcp->get_fill_value_type(), array_size, pia);
2357 }
2358 else {
2359 vector<pair<Type,int>> structure_type_element;
2360 bool ret_value = is_simple_dap_structure_scalar_array(btp,structure_type_element);
2361 if (ret_value)
2362 dcp->add_chunk(dcp->get_byte_order(), dcp->get_fill_value(), dcp->get_fill_value_type(), btp->width(), pia, structure_type_element);
2363 else
2364 dcp->add_chunk(dcp->get_byte_order(), dcp->get_fill_value(), dcp->get_fill_value_type(), btp->width(), pia);
2365 }
2366
2367 }
2368 }
2369
2370 // Contiguous data
2371 auto chunk = var_node.child("dmrpp:chunk");
2372 if (chunk) {
2373 chunk_found = 1;
2374 process_chunk(dc(btp), chunk);
2375 }
2376
2377 auto compact = var_node.child("dmrpp:compact");
2378 if (compact) {
2379 compact_found = 1;
2380 process_compact(btp, compact);
2381 }
2382
2383 auto missing_data = var_node.child("dmrpp:missingdata");
2384 if (missing_data) {
2385 missing_data_found = 1;
2386 process_missing_data(btp, missing_data);
2387 }
2388
2389 auto special_structure_data = var_node.child("dmrpp:specialstructuredata");
2390 if (special_structure_data) {
2391 special_structure_data_found = 1;
2392 process_special_structure_data(btp, special_structure_data);
2393 }
2394
2395 auto vlsa_element = var_node.child(DMRPP_VLSA_ELEMENT);
2396 if (vlsa_element) {
2397 vlsa_found = 1;
2398 process_vlsa(btp, vlsa_element);
2399 }
2400
2401 // Here we (optionally) check that exactly one of the supported types of node was found
2402 if (DmrppRequestHandler::d_require_chunks) {
2403 int elements_found = chunks_found + chunk_found + compact_found + vlsa_found + missing_data_found + special_structure_data_found;
2404 if (elements_found != 1) {
2405 ostringstream oss;
2406 oss << "Expected chunk, chunks or compact or variable length string or missing data or special structure data information in the DMR++ data. Found " << elements_found
2407 << " types of nodes.";
2408 throw BESInternalError(oss.str(), __FILE__, __LINE__);
2409 }
2410 }
2411
2412 dc(btp)->set_chunks_loaded(true);
2413}
2414
2415bool DMZ::is_simple_dap_structure_scalar_array(BaseType *btp, vector<pair<Type,int>> &structure_type_element) {
2416
2417 bool ret_value = false;
2418
2419 if (btp->type()==dods_array_c) {
2420
2421 auto t_a = dynamic_cast<Array *>(btp);
2422 Type t_array_var = t_a->var()->type();
2423 if (t_array_var == dods_structure_c) {
2424 auto t_s = dynamic_cast<Structure *>(t_a->var());
2425 ret_value = is_simple_dap_structure_internal(t_s, structure_type_element);
2426 }
2427 }
2428 else if (btp->type() == dods_structure_c) {
2429 auto t_s = dynamic_cast<Structure *>(btp);
2430 ret_value = is_simple_dap_structure_internal(t_s, structure_type_element);
2431 }
2432
2433 return ret_value;
2434}
2435
2436bool DMZ::is_simple_dap_structure_internal(const Structure *ds, vector<pair<Type,int>> &structure_type_element) {
2437
2438 bool ret_value = true;
2439 for (const auto &bt:ds->variables()) {
2440
2441 Type t_bt = bt->type();
2442
2443 // Only support array or scalar of float/int.
2444 if (t_bt == dods_array_c) {
2445 auto t_a = dynamic_cast<Array *>(bt);
2446 Type t_array_var = t_a->var()->type();
2447
2448 if (libdap::is_simple_type(t_array_var) == true && t_array_var != dods_str_c) {
2449 pair<Type,int> temp_pair;
2450 int64_t num_eles= t_a->length_ll();
2451 temp_pair.first = t_array_var;
2452 temp_pair.second = (int)(num_eles);
2453 structure_type_element.push_back(temp_pair);
2454 }
2455 else {
2456 ret_value = false;
2457 break;
2458 }
2459 }
2460 else if (libdap::is_simple_type(t_bt) == true && t_bt != dods_str_c) {
2461 pair<Type,int> temp_pair;
2462 temp_pair.first = t_bt;
2463 temp_pair.second = 1;
2464 structure_type_element.push_back(temp_pair);
2465 }
2466 else {
2467 ret_value = false;
2468 break;
2469 }
2470 }
2471
2472 return ret_value;
2473}
2474
2475void DMZ::handle_subset(DmrppArray *da, libdap::Array::Dim_iter dim_iter, unsigned long & subset_index, vector<unsigned long long> & subset_pos,
2476 vector<unsigned char>& subset_buf, vector<unsigned char>& whole_buf) {
2477
2478 // Obtain the number of elements in each dimension
2479 vector<unsigned long long> da_dims = da->get_shape(false);
2480
2481 // Obtain the number of bytes of each element
2482 unsigned int bytes_per_elem = da->prototype()->width();
2483
2484 // Obtain the start, stop and stride for this each dimension
2485 uint64_t start = da->dimension_start_ll(dim_iter, true);
2486 uint64_t stop = da->dimension_stop_ll(dim_iter, true);
2487 uint64_t stride = da->dimension_stride_ll(dim_iter, true);
2488
2489 dim_iter++;
2490
2491 // The end case for the recursion is dimIter == dim_end(); stride == 1 is an optimization
2492 // See the else clause for the general case.
2493 if (dim_iter == da->dim_end() && stride == 1) {
2494
2495 // For the start and stop indexes of the subset, get the matching indexes in the whole array.
2496 subset_pos.push_back(start);
2497 unsigned long long start_index = INDEX_nD_TO_1D( da_dims,subset_pos);
2498 subset_pos.pop_back();
2499
2500 subset_pos.push_back(stop);
2501 unsigned long long stop_index = INDEX_nD_TO_1D( da_dims,subset_pos);
2502 subset_pos.pop_back();
2503
2504 // Copy data block from start_index to stop_index
2505 unsigned char * temp_subset_buf = subset_buf.data() + subset_index*bytes_per_elem;
2506 unsigned char * temp_whole_buf = whole_buf.data() + start_index*bytes_per_elem;
2507 size_t num_bytes_to_copy = (stop_index-start_index+1)*bytes_per_elem;
2508
2509 memcpy(temp_subset_buf,temp_whole_buf,num_bytes_to_copy);
2510
2511 // Move the subset_index to the next location.
2512 subset_index = subset_index +(stop_index-start_index+1);
2513
2514 }
2515 else {
2516 for (uint64_t myDimIndex = start; myDimIndex <= stop; myDimIndex += stride) {
2517
2518 // Is it the last dimension?
2519 if (dim_iter != da->dim_end()) {
2520 // Nope! Then we recurse to the last dimension to read stuff
2521 subset_pos.push_back(myDimIndex);
2522
2523 // The recursive function will fill in the subset_pos until the dim_end().
2524 handle_subset(da,dim_iter,subset_index, subset_pos,subset_buf,whole_buf);
2525 subset_pos.pop_back();
2526 }
2527 else {
2528 // We are at the last (innermost) dimension, so it's time to copy values.
2529 subset_pos.push_back(myDimIndex);
2530 unsigned int sourceIndex = INDEX_nD_TO_1D( da_dims,subset_pos);
2531 subset_pos.pop_back();
2532
2533 unsigned char * temp_subset_buf = subset_buf.data() + subset_index*bytes_per_elem;
2534 unsigned char * temp_whole_buf = whole_buf.data() + sourceIndex*bytes_per_elem;
2535 memcpy(temp_subset_buf,temp_whole_buf,bytes_per_elem);
2536
2537 subset_index++;
2538 }
2539 }
2540 }
2541}
2542
2543void DMZ::add_mblock_index(const xml_node &chunk, queue<vector<pair<unsigned long long, unsigned long long >>>& mb_index_queue,
2544 vector<pair<unsigned long long, unsigned long long>>& offset_length_pair) const{
2545
2546 string LBIndex_value;
2547 for (xml_attribute attr = chunk.first_attribute(); attr; attr = attr.next_attribute()) {
2548 if (is_eq(attr.name(),"LinkedBlockIndex")) {
2549 LBIndex_value = attr.value();
2550 break;
2551 }
2552 }
2553
2554 // We find the linked blocks in this chunk
2555 if (LBIndex_value.empty() == false) {
2556
2557 pair<unsigned long long, unsigned long long> temp_offset_length;
2558
2559 // We need to loop through the chunk attributes again to find the offset and length.
2560 bool found_offset = false;
2561 bool found_length = false;
2562 for (xml_attribute attr = chunk.first_attribute(); attr; attr = attr.next_attribute()) {
2563 if (is_eq(attr.name(), "offset")) {
2564 string offset = attr.value();
2565 temp_offset_length.first = stoull(offset);
2566 found_offset = true;
2567 }
2568 else if (is_eq(attr.name(), "nBytes")) {
2569 string size = attr.value();
2570 temp_offset_length.second = stoull(size);
2571 found_length = true;
2572 }
2573 if (found_offset && found_length)
2574 break;
2575 }
2576
2577 // We make this a new chunk that stores the multiple blocks.
2578 if (LBIndex_value == "0") {
2579 if (offset_length_pair.empty() == false) {
2580 mb_index_queue.push(offset_length_pair);
2581
2582 // Here offset_length_pair will be reused, so clear it.
2583 offset_length_pair.clear();
2584 offset_length_pair.push_back(temp_offset_length);
2585 }
2586 else
2587 offset_length_pair.push_back(temp_offset_length);
2588 }
2589 else
2590 offset_length_pair.push_back(temp_offset_length);
2591 }
2592
2593}
2594
2595void DMZ::process_multi_blocks_chunk(dmrpp::DmrppCommon *dc, const pugi::xml_node &chunk, std::queue<std::vector<std::pair<unsigned long long, unsigned long long>>>& mb_index_queue) const {
2596
2597 // Follow process_chunk
2598 string href;
2599 string trust;
2600 string offset;
2601 string size;
2602 string chunk_position_in_array;
2603 string filter_mask;
2604 bool href_trusted = false;
2605
2606 // We will only check if the last attribute is the "LinkedBlockIndex".
2607 // If yes, we will check the "LinkedBlockIndex" value, mark it if it is the first index(0).
2608 // If the "LinkedBlockIndex" is not 0, we simply return. The information of this linked block is retrieved from the mb_index_queue already.
2609 bool multi_lbs_chunk = false;
2610 auto LBI_attr = chunk.last_attribute();
2611 if (is_eq(LBI_attr.name(),"LinkedBlockIndex")) {
2612 string LBI_attr_value = LBI_attr.value();
2613 if (LBI_attr_value =="0")
2614 multi_lbs_chunk = true;
2615 else
2616 return;
2617 }
2618 else {// This should happen really rarely, still we try to cover the corner case. We loop through all the attributes and search if Linked BlockIndex is present.
2619 for (xml_attribute attr = chunk.first_attribute(); attr; attr = attr.next_attribute()) {
2620 if (is_eq(LBI_attr.name(),"LinkedBlockIndex")) {
2621 string LBI_attr_value = LBI_attr.value();
2622 if (LBI_attr_value =="0")
2623 multi_lbs_chunk = true;
2624 else
2625 return;
2626 }
2627 }
2628 }
2629
2630 // For linked block cases, as far as we know, we don't need to load fill values as the HDF5 case. So we ignore checking and filling the fillvalue to save performance.
2631 for (xml_attribute attr = chunk.first_attribute(); attr; attr = attr.next_attribute()) {
2632
2633 if (is_eq(attr.name(), "offset")) {
2634 offset = attr.value();
2635 }
2636 else if (is_eq(attr.name(), "nBytes")) {
2637 size = attr.value();
2638 }
2639 else if (is_eq(attr.name(), "chunkPositionInArray")) {
2640 chunk_position_in_array = attr.value();
2641 }
2642 else if (is_eq(attr.name(), "fm")) {
2643 filter_mask = attr.value();
2644 }
2645 else if (is_eq(attr.name(), "href")) {
2646 href = attr.value();
2647 }
2648 else if (is_eq(attr.name(), "trust") || is_eq(attr.name(), "dmrpp:trust")) {
2649 href_trusted = is_eq(attr.value(), "true");
2650 }
2651 }
2652
2653 if (offset.empty() || size.empty())
2654 throw BESInternalError("Both size and offset are required for a chunk node.", __FILE__, __LINE__);
2655
2656 if (multi_lbs_chunk) {//The chunk that has linked blocks
2657
2658 vector<pair<unsigned long long, unsigned long long>> temp_pair;
2659 if (!mb_index_queue.empty())
2660 temp_pair = mb_index_queue.front();
2661
2662 if (!href.empty()) {
2663 shared_ptr<http::url> data_url(new http::url(href, href_trusted));
2664 dc->add_chunk(data_url, dc->get_byte_order(), chunk_position_in_array,temp_pair);
2665 }
2666 else {
2667 dc->add_chunk(d_dataset_elem_href, dc->get_byte_order(), chunk_position_in_array, temp_pair);
2668 }
2669 mb_index_queue.pop(); // Remove the processed element
2670
2671 }
2672 else { //General Chunk, not the linked block.
2673 if (!href.empty()) {
2674 shared_ptr<http::url> data_url(new http::url(href, href_trusted));
2675 dc->add_chunk(data_url, dc->get_byte_order(), stoull(size), stoull(offset), chunk_position_in_array);
2676 }
2677 else {
2678 dc->add_chunk(d_dataset_elem_href, dc->get_byte_order(), stoull(size), stoull(offset), chunk_position_in_array);
2679 }
2680 }
2681
2682
2683 dc->accumlate_storage_size(stoull(size));
2684
2685}
2686
2687// Return the index of the pos in nD array to the equivalent pos in 1D array
2688size_t DMZ::INDEX_nD_TO_1D (const std::vector < unsigned long long > &dims,
2689 const std::vector < unsigned long long > &pos) {
2690 //
2691 // "int a[10][20][30] // & a[1][2][3] == a + (20*30+1 + 30*2 + 1 *3)"
2692 // "int b[10][2] // &b[1][1] == b + (2*1 + 1)"
2693 //
2694 if(dims.size () != pos.size ())
2695 throw InternalErr(__FILE__,__LINE__,"dimension error in INDEX_nD_TO_1D routine.");
2696 size_t sum = 0;
2697 size_t start = 1;
2698
2699 for (const auto & one_pos:pos) {
2700 size_t m = 1;
2701 for (size_t j = start; j < dims.size (); j++)
2702 m *= dims[j];
2703 sum += m * one_pos;
2704 start++;
2705 }
2706 return sum;
2707}
2708
2710
2711} // namespace dmrpp
exception thrown if internal error encountered
static std::vector< std::string > split(const std::string &s, char delim='/', bool skip_empty=true)
Splits the string s into the return vector of tokens using the delimiter delim and skipping empty val...
Definition BESUtil.cc:1068
static TheBESKeys * TheKeys()
Access to the singleton.
Definition TheBESKeys.cc:85
static bool read_bool_key(const std::string &key, bool default_value)
Read a boolean-valued key from the bes.conf file.
DMZ()=default
Build a DMZ without simultaneously parsing an XML document.
virtual void load_chunks(libdap::BaseType *btp)
Load the chunk information into a variable.
Definition DMZ.cc:2230
void parse_xml_doc(const std::string &filename)
Build the DOM tree for a DMR++ XML document.
Definition DMZ.cc:185
virtual void build_thin_dmr(libdap::DMR *dmr)
populate the DMR instance as a 'thin DMR'
Definition DMZ.cc:773
void parse_xml_string(const std::string &contents)
Build a DOM tree for a DMR++ using content from a string.
Definition DMZ.cc:290
Extend libdap::Array so that a handler can read data using a DMR++ file.
Definition DmrppArray.h:77
Size and offset information of data included in DMR++ files.
Definition DmrppCommon.h:97
void set_multi_linked_blocks_chunk(bool value)
Set the value of the boolean variable that indicates this variable contains multiple linked blocks in...
void set_disable_dio(bool value)
Set the value of the compact property.
virtual void parse_chunk_dimension_sizes(const std::string &chunk_dim_sizes_string)
Set the dimension sizes for a chunk.
virtual unsigned long add_chunk(std::shared_ptr< http::url > d_data_url, const std::string &byte_order, unsigned long long size, unsigned long long offset, const std::string &position_in_array)
Adds a chunk to the vector of chunk refs (byteStreams) and returns the size of the chunks internal ve...
void set_filter(const std::string &value)
Set the value of the filters property.
virtual void ingest_byte_order(const std::string &byte_order_string)
Parses the text content of the XML element chunks:byteOrder.
virtual const std::vector< unsigned long long > & get_chunk_dimension_sizes() const
The chunk dimension sizes held in a const vector.
void set_compact(bool value)
Set the value of the compact property.
void set_missing_data(bool value)
Set the value of the missing data.
virtual void set_one_chunk_fill_value(bool ufv)
Set the one_chunk_fill_value property.
Type
Type of JSON value.
Definition rapidjson.h:664