bes Updated for version 3.21.1
The Backend Server (BES) is the lower two tiers of the Hyrax data server
ArrayAggregateOnOuterDimension.cc
1
2// This file is part of the "NcML Module" project, a BES module designed
3// to allow NcML files to be used to be used as a wrapper to add
4// AIS to existing datasets of any format.
5//
6// Copyright (c) 2010 OPeNDAP, Inc.
7// Author: Michael Johnson <m.johnson@opendap.org>
8//
9// For more information, please also see the main website: http://opendap.org/
10//
11// This library is free software; you can redistribute it and/or
12// modify it under the terms of the GNU Lesser General Public
13// License as published by the Free Software Foundation; either
14// version 2.1 of the License, or (at your option) any later version.
15//
16// This library is distributed in the hope that it will be useful,
17// but WITHOUT ANY WARRANTY; without even the implied warranty of
18// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
19// Lesser General Public License for more details.
20//
21// You should have received a copy of the GNU Lesser General Public
22// License along with this library; if not, write to the Free Software
23// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
24//
25// Please see the files COPYING and COPYRIGHT for more information on the GLPL.
26//
27// You can contact OPeNDAP, Inc. at PO Box 112, Saunderstown, RI. 02874-0112.
29
30#include "config.h"
31
32#include "ArrayAggregateOnOuterDimension.h"
33#include "AggregationException.h"
34
35#include <libdap/DataDDS.h> // libdap::DataDDS
36#include <libdap/Marshaller.h>
37
38// only NCML backlinks we want in this agg_util class.
39#include "NCMLDebug.h" // BESDEBUG and throw macros
40#include "BESDebug.h"
41#include "BESStopWatch.h"
42
43#define DEBUG_CHANNEL "agg_util"
44#define prolog string("ArrayAggregateOnOuterDimension::").append(__func__).append("() - ")
45
46namespace agg_util {
47
49 AMDList memberDatasets, unique_ptr<ArrayGetterInterface> arrayGetter, const Dimension& newDim) :
50 ArrayAggregationBase(proto, std::move(memberDatasets),std::move(arrayGetter)), // no new dim yet in super chain
51 _newDim(newDim)
52{
53 BESDEBUG(DEBUG_CHANNEL, "ArrayAggregateOnOuterDimension: ctor called!" << endl);
54
55 // Up the rank of the array using the new dimension as outer (prepend)
56 BESDEBUG(DEBUG_CHANNEL, "ArrayAggregateOnOuterDimension: adding new outer dimension: " << _newDim.name << endl);
57 prepend_dim(_newDim.size, _newDim.name);
58}
59
61 ArrayAggregationBase(proto), _newDim()
62{
63 BESDEBUG(DEBUG_CHANNEL, "ArrayAggregateOnOuterDimension() copy ctor called!" << endl);
64 duplicate(proto);
65}
66
68{
69 BESDEBUG(DEBUG_CHANNEL, "~ArrayAggregateOnOuterDimension() dtor called!" << endl);
70 cleanup();
71}
72
78
81{
82 if (this != &rhs) {
83 cleanup();
84 ArrayAggregationBase::operator=(rhs);
85 duplicate(rhs);
86 }
87 return *this;
88}
89
90// Set this to 0 to get the old behavior where the entire response
91// (for this variable) is built in memory and then sent to the client.
92#define PIPELINING 1
93
113
114bool ArrayAggregateOnOuterDimension::serialize(libdap::ConstraintEvaluator &eval, libdap::DDS &dds,
115 libdap::Marshaller &m, bool ce_eval)
116{
117
118 BES_STOPWATCH_START(DEBUG_CHANNEL, prolog + "Timing");
119
120 // Only continue if we are supposed to serialize this object at all.
121 if (!(send_p() || is_in_selection())) {
122 BESDEBUG_FUNC(DEBUG_CHANNEL, "Object not in output, skipping... name=" << name() << endl);
123 return true;
124 }
125
126 bool status = false;
127
128 delete bes_timing::elapsedTimeToReadStart;
129 bes_timing::elapsedTimeToReadStart = 0;
130
131 if (!read_p()) {
132 // call subclass impl
134 // outer one is the first in iteration
135 const Array::dimension& outerDim = *(dim_begin());
136 BESDEBUG(DEBUG_CHANNEL,
137 "Aggregating datasets array with outer dimension constraints: " << " start=" << outerDim.start << " stride=" << outerDim.stride << " stop=" << outerDim.stop << endl);
138
139 // Be extra sure we have enough datasets for the given request
140 if (static_cast<unsigned int>(outerDim.size) != getDatasetList().size()) {
141 // Not sure whose fault it was, but tell the author
142 THROW_NCML_PARSE_ERROR(-1, "The new outer dimension of the joinNew aggregation doesn't "
143 " have the same size as the number of datasets in the aggregation!");
144 }
145
146#if PIPELINING
147 // Prepare our output buffer for our constrained length
148 m.put_vector_start(length());
149#else
150 reserve_value_capacity();
151#endif
152 // this index pointing into the value buffer for where to write.
153 // The buffer has a stride equal to the _pSubArrayProto->size().
154
155 // Keep this to do some error checking
156 int nextElementIndex = 0;
157
158 // Traverse the dataset array respecting hyperslab
159 for (int i = outerDim.start; i <= outerDim.stop && i < outerDim.size; i += outerDim.stride) {
160 AggMemberDataset& dataset = *((getDatasetList())[i]);
161
162 try {
164 name(), dataset, getArrayGetterInterface(), DEBUG_CHANNEL);
165#if PIPELINING
166 delete bes_timing::elapsedTimeToTransmitStart;
167 bes_timing::elapsedTimeToTransmitStart = 0;
168 m.put_vector_part(pDatasetArray->get_buf(), getGranuleTemplateArray().length(), var()->width(),
169 var()->type());
170#else
171 this->set_value_slice_from_row_major_vector(*pDatasetArray, nextElementIndex);
172#endif
173
174 pDatasetArray->clear_local_data();
175 }
177 std::ostringstream oss;
178 oss << "Got AggregationException while streaming dataset index=" << i << " data for location=\""
179 << dataset.getLocation() << "\" The error msg was: " << std::string(ex.what());
180 THROW_NCML_PARSE_ERROR(-1, oss.str());
181 }
182
183 // Jump forward by the amount we added.
184 nextElementIndex += getGranuleTemplateArray().length();
185 }
186
187 // If we succeeded, we are at the end of the array!
188 NCML_ASSERT_MSG(nextElementIndex == length(), "Logic error:\n"
189 "ArrayAggregateOnOuterDimension::read(): "
190 "At end of aggregating, expected the nextElementIndex to be the length of the "
191 "aggregated array, but it wasn't!");
192
193#if PIPELINING
194 m.put_vector_end();
195 status = true;
196#else
197 // Set the cache bit to avoid recomputing
198 set_read_p(true);
199
200 delete bes_timing::elapsedTimeToTransmitStart;
201 bes_timing::elapsedTimeToTransmitStart = 0;
202 status = libdap::Array::serialize(eval, dds, m, ce_eval);
203#endif
204 }
205 else {
206 status = libdap::Array::serialize(eval, dds, m, ce_eval);
207 }
208
209 return status;
210}
211
213// helpers
214
215void ArrayAggregateOnOuterDimension::duplicate(const ArrayAggregateOnOuterDimension& rhs)
216{
217 _newDim = rhs._newDim;
218}
219
220void ArrayAggregateOnOuterDimension::cleanup() const noexcept
221{
222 // not implemented
223}
224
225/* virtual */
227{
228 // transfer the constraints from this object into the subArray template
229 // skipping our first dim which is the new one and not in the subArray.
231 *this, // from this
232 true, // skip first dim in the copy since we handle it special
233 false, // also skip it in the toArray for the same reason.
234 true, // print debug
235 DEBUG_CHANNEL); // on this channel
236}
237
238/* virtual */
239// In this version of the code, I broke apart the call to
240// agg_util::AggregationUtil::addDatasetArrayDataToAggregationOutputArray()
241// into two calls: AggregationUtil::readDatasetArrayDataForAggregation()
242// and this->set_value_slice_from_row_major_vector(). This
244{
245 BES_STOPWATCH_START(DEBUG_CHANNEL, prolog + "Timing");
246
247 // outer one is the first in iteration
248 const Array::dimension& outerDim = *(dim_begin());
249 BESDEBUG(DEBUG_CHANNEL,
250 "Aggregating datasets array with outer dimension constraints: " << " start=" << outerDim.start << " stride=" << outerDim.stride << " stop=" << outerDim.stop << endl);
251
252 // Be extra sure we have enough datasets for the given request
253 if (static_cast<unsigned int>(outerDim.size) != getDatasetList().size()) {
254 // Not sure whose fault it was, but tell the author
255 THROW_NCML_PARSE_ERROR(-1, "The new outer dimension of the joinNew aggregation doesn't "
256 " have the same size as the number of datasets in the aggregation!");
257 }
258
259 // Prepare our output buffer for our constrained length
260 reserve_value_capacity();
261
262 // this index pointing into the value buffer for where to write.
263 // The buffer has a stride equal to the _pSubArrayProto->size().
264 int nextElementIndex = 0;
265
266 // Traverse the dataset array respecting hyperslab
267 for (int i = outerDim.start; i <= outerDim.stop && i < outerDim.size; i += outerDim.stride) {
268 AggMemberDataset& dataset = *((getDatasetList())[i]);
269
270 try {
271 agg_util::AggregationUtil::addDatasetArrayDataToAggregationOutputArray(*this, // into the output buffer of this object
272 nextElementIndex, // into the next open slice
273 getGranuleTemplateArray(), // constraints template
274 name(), // aggvar name
275 dataset, // Dataset who's DDS should be searched
276 getArrayGetterInterface(), DEBUG_CHANNEL);
277 }
279 std::ostringstream oss;
280 oss << "Got AggregationException while streaming dataset index=" << i << " data for location=\""
281 << dataset.getLocation() << "\" The error msg was: " << std::string(ex.what());
282 THROW_NCML_PARSE_ERROR(-1, oss.str());
283 }
284
285 // Jump forward by the amount we added.
286 nextElementIndex += getGranuleTemplateArray().length();
287 }
288
289 // If we succeeded, we are at the end of the array!
290 NCML_ASSERT_MSG(nextElementIndex == length(), "Logic error:\n"
291 "ArrayAggregateOnOuterDimension::read(): "
292 "At end of aggregating, expected the nextElementIndex to be the length of the "
293 "aggregated array, but it wasn't!");
294}
295
296}
const std::string & getLocation() const
static void addDatasetArrayDataToAggregationOutputArray(libdap::Array &oOutputArray, unsigned int atIndex, const libdap::Array &constrainedTemplateArray, const string &varName, AggMemberDataset &dataset, const ArrayGetterInterface &arrayGetter, const string &debugChannel)
static void transferArrayConstraints(libdap::Array *pToArray, const libdap::Array &fromArray, bool skipFirstFromDim, bool skipFirstToDim, bool printDebug=false, const std::string &debugChannel="agg_util")
static libdap::Array * readDatasetArrayDataForAggregation(const libdap::Array &constrainedTemplateArray, const std::string &varName, AggMemberDataset &dataset, const ArrayGetterInterface &arrayGetter, const std::string &debugChannel)
ArrayAggregateOnOuterDimension & operator=(const ArrayAggregateOnOuterDimension &rhs)
ArrayAggregateOnOuterDimension(const libdap::Array &proto, AMDList memberDatasets, std::unique_ptr< ArrayGetterInterface > arrayGetter, const Dimension &newDim)
ArrayAggregateOnOuterDimension * ptr_duplicate() override
bool serialize(libdap::ConstraintEvaluator &eval, libdap::DDS &dds, libdap::Marshaller &m, bool ce_eval) override
const AMDList & getDatasetList() const
const ArrayGetterInterface & getArrayGetterInterface() const
ArrayAggregationBase(const libdap::Array &granuleProto, AMDList memberDatasets, std::unique_ptr< ArrayGetterInterface > arrayGetter)
Helper class for temporarily hijacking an existing dhi to load a DDX response for one particular file...