bes Updated for version 3.21.1
The Backend Server (BES) is the lower two tiers of the Hyrax data server
ArrayJoinExistingAggregation.cc
1
2// This file is part of the "NcML Module" project, a BES module designed
3// to allow NcML files to be used to be used as a wrapper to add
4// AIS to existing datasets of any format.
5//
6// Copyright (c) 2010 OPeNDAP, Inc.
7// Author: Michael Johnson <m.johnson@opendap.org>
8//
9// For more information, please also see the main website: http://opendap.org/
10//
11// This library is free software; you can redistribute it and/or
12// modify it under the terms of the GNU Lesser General Public
13// License as published by the Free Software Foundation; either
14// version 2.1 of the License, or (at your option) any later version.
15//
16// This library is distributed in the hope that it will be useful,
17// but WITHOUT ANY WARRANTY; without even the implied warranty of
18// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
19// Lesser General Public License for more details.
20//
21// You should have received a copy of the GNU Lesser General Public
22// License along with this library; if not, write to the Free Software
23// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
24//
25// Please see the files COPYING and COPYRIGHT for more information on the GLPL.
26//
27// You can contact OPeNDAP, Inc. at PO Box 112, Saunderstown, RI. 02874-0112.
29
30#include "config.h"
31
32#include <sstream>
33
34#include <libdap/Marshaller.h>
35
36#include "BESDebug.h"
37#include "BESStopWatch.h"
38
39#include "ArrayJoinExistingAggregation.h"
40
41#include "AggregationException.h" // agg_util
42#include "AggregationUtil.h" // agg_util
43#include "NCMLDebug.h"
44
45#define DEBUG_CHANNEL "ncml:2"
46#define prolog string("ArrayJoinExistingAggregation::").append(__func__).append("() - ")
47
48namespace agg_util {
49
51 AMDList memberDatasets, std::unique_ptr<ArrayGetterInterface> arrayGetter, const Dimension& joinDim) :
52 ArrayAggregationBase(granuleTemplate, std::move(memberDatasets), std::move(arrayGetter)),
53 _joinDim(joinDim)
54{
55 BESDEBUG_FUNC(DEBUG_CHANNEL, "Making the aggregated outer dimension be: " + joinDim.toString() + "\n");
56
57 // We created the array with the given granule prototype, but the resulting
58 // outer dimension size must be calculated according to the
59 // value in the passed in dimension object.
60 libdap::Array::dimension& rOuterDim = *(dim_begin());
61 NCML_ASSERT_MSG(rOuterDim.name == joinDim.name, "The outer dimension name of this is not the expected "
62 "outer dimension name! Broken precondition: This ctor cannot be called "
63 "without this being true!");
64 rOuterDim.size = joinDim.size;
65 // Force it to recompute constraints since we changed size.
66 reset_constraint();
67
68 ostringstream oss;
70
71 BESDEBUG_FUNC(DEBUG_CHANNEL, "Constrained Dims after set are: " + oss.str());
72}
73
75 ArrayAggregationBase(rhs), _joinDim(rhs._joinDim)
76{
77 duplicate(rhs);
78}
79
80/* virtual */
81ArrayJoinExistingAggregation::~ArrayJoinExistingAggregation()
82{
83 cleanup();
84}
85
87ArrayJoinExistingAggregation::operator=(const ArrayJoinExistingAggregation& rhs)
88{
89 if (this != &rhs) {
90 cleanup();
91 ArrayAggregationBase::operator=(rhs);
92 duplicate(rhs);
93 }
94 return *this;
95}
96
97/* virtual */
103
104// Set this to 0 to get the old behavior where the entire response
105// (for this variable) is built in memory and then sent to the client.
106#define PIPELINING 1
107
108/* virtual */
109// begin modifying here for the double buffering
110// see notes about how this was written marked with '***'
111// Following this method is an older version of serialize that
112// provides no new functionality but does get run instead of the
113// more general implementation in libdap::Array.
114bool ArrayJoinExistingAggregation::serialize(libdap::ConstraintEvaluator &eval, libdap::DDS &dds, libdap::Marshaller &m,
115 bool ce_eval)
116{
117 BES_STOPWATCH_START(DEBUG_CHANNEL, prolog + "Timing");
118
119 // *** This serialize() implementation was made by starting with a simple version that
120 // *** tested read_p(), calling read() if needed and tsting send_p() and is_in_selection(),
121 // *** returning true if the data did not need to be sent. I moved that test here.
122
123 // Only continue if we are supposed to serialize this object at all.
124 if (!(send_p() || is_in_selection())) {
125 BESDEBUG_FUNC(DEBUG_CHANNEL, "Object not in output, skipping... name=" << name() << endl);
126 return true;
127 }
128
129 // *** Add status so that we can do our magic _or_ pass off the call to libdap
130 // *** and collect the result either way.
131 bool status = false;
132
133 if (!read_p()) {
134 // *** copy lines from AggregationBase::read() into here in place
135 // *** of the call to read()
136
137 // call subclass impl
139
140 // *** Inserted code from readConstrainedGranuleArraysAndAggregateDataHook here
141
142 // outer one is the first in iteration
143 const Array::dimension& outerDim = *(dim_begin());
144 BESDEBUG("ncml",
145 "Aggregating datasets array with outer dimension constraints: " << " start=" << outerDim.start << " stride=" << outerDim.stride << " stop=" << outerDim.stop << endl);
146
147 try {
148#if PIPELINING
149 // assumes the constraints are already set properly on this
150 m.put_vector_start(length());
151#else
152 reserve_value_capacity();
153#endif
154
155 // Start the iteration state for the granule.
156 const AMDList& datasets = getDatasetList(); // the list
157 NCML_ASSERT(!datasets.empty());
158 int currDatasetIndex = 0; // index into datasets
159 const AggMemberDataset* pCurrDataset = (datasets[currDatasetIndex]).get();
160
161 int outerDimIndexOfCurrDatasetHead = 0;
162 int currDatasetSize = int(pCurrDataset->getCachedDimensionSize(_joinDim.name));
163 bool currDatasetWasRead = false;
164
165 // where in this output array we are writing next
166 unsigned int nextOutputBufferElementIndex = 0;
167
168 // Traverse the outer dimension constraints,
169 // Keeping track of which dataset we need to
170 // be inside for the given values of the constraint.
171 for (int outerDimIndex = outerDim.start; outerDimIndex <= outerDim.stop && outerDimIndex < outerDim.size;
172 outerDimIndex += outerDim.stride) {
173 // Figure out where the given outer index maps into in local granule space
174 int localGranuleIndex = outerDimIndex - outerDimIndexOfCurrDatasetHead;
175
176 // if this is beyond the dataset end, move state to the next dataset
177 // and try again until we're in the proper interval, with proper dataset.
178 while (localGranuleIndex >= currDatasetSize) {
179 localGranuleIndex -= currDatasetSize;
180 outerDimIndexOfCurrDatasetHead += currDatasetSize;
181 ++currDatasetIndex;
182 NCML_ASSERT(currDatasetIndex < int(datasets.size()));
183 pCurrDataset = datasets[currDatasetIndex].get();
184 currDatasetSize = pCurrDataset->getCachedDimensionSize(_joinDim.name);
185 currDatasetWasRead = false;
186
187 BESDEBUG_FUNC(DEBUG_CHANNEL,
188 "The constraint traversal passed a granule boundary " << "on the outer dimension and is stepping forward into " << "granule index=" << currDatasetIndex << endl);
189 }
190
191 // If we haven't read in this granule yet (we passed a boundary)
192 // then do it now. Map constraints into the local granule space.
193 if (!currDatasetWasRead) {
194 BESDEBUG_FUNC(DEBUG_CHANNEL,
195 " Current granule dataset was traversed but not yet " "read and copied into output. Mapping constraints " "and calling read()..." << endl);
196
197 // Set up a constraint object for the actual granule read
198 // so that it only loads the data values in which we are
199 // interested.
200 Array& granuleConstraintTemplate = getGranuleTemplateArray();
201
202 // The inner dim constraints were set up in the containing read() call.
203 // The outer dim was left open for us to fix now...
204 Array::Dim_iter outerDimIt = granuleConstraintTemplate.dim_begin();
205
206 // modify the outerdim size to match the dataset we need to
207 // load. The inners MUST match so we can let those get
208 //checked later...
209 outerDimIt->size = currDatasetSize;
210 outerDimIt->c_size = currDatasetSize; // this will get recalc below?
211
212 // find the mapped endpoint
213 // Basically, the fullspace endpoint mapped to local offset,
214 // clamped into the local granule size.
215
216 // TODO: we will rewrite the following when we need to add the big array support.
217 // Now we will replace the std:min() with our own version to avoid the
218 // inconsistent type comparison. KY 2022-12-28
219#if 0
220 int granuleStopIndex = std::min(outerDim.stop - outerDimIndexOfCurrDatasetHead,
221 (unsigned long long)currDatasetSize - 1);
222#endif
223 int granuleStopIndex = outerDim.stop - outerDimIndexOfCurrDatasetHead;
224 if (granuleStopIndex > (currDatasetSize - 1))
225 granuleStopIndex = currDatasetSize - 1;
226
227 // we must clamp the stride to the interval of the
228 // dataset in order to avoid an exception in
229 // add_constraint on stride being larger than dataset.
230 // TODO: we will rewrite the following when we need to add the big array support.
231 // Now we will replace the std:min() with our own version to avoid the
232 // inconsistent type comparison. KY 2022-12-28
233#if 0
234 int clampedStride = std::min(outerDim.stride, (unsigned long long)currDatasetSize);
235#endif
236 int clampedStride = outerDim.stride;
237 if (clampedStride > currDatasetSize)
238 clampedStride = currDatasetSize;
239
240 // mapped endpoint clamped within this granule
241 granuleConstraintTemplate.add_constraint(outerDimIt, localGranuleIndex, clampedStride,
242 granuleStopIndex);
244 getGranuleTemplateArray(), name(), const_cast<AggMemberDataset&>(*pCurrDataset),
245 getArrayGetterInterface(), DEBUG_CHANNEL);
246#if PIPELINING
247 m.put_vector_part(pDatasetArray->get_buf(), getGranuleTemplateArray().length(), var()->width(),
248 var()->type());
249#else
250 this->set_value_slice_from_row_major_vector(*pDatasetArray, nextOutputBufferElementIndex);
251#endif
252
253 pDatasetArray->clear_local_data();
254
255 // Jump output buffer index forward by the amount we added.
256 nextOutputBufferElementIndex += getGranuleTemplateArray().length();
257 currDatasetWasRead = true;
258
259 BESDEBUG_FUNC(DEBUG_CHANNEL,
260 " The granule index " << currDatasetIndex << " was read with constraints and copied into the aggregation output." << endl);
261 } // !currDatasetWasRead
262 } // for loop over outerDim
263 } // end of try
264 catch (AggregationException& ex) {
265 THROW_NCML_PARSE_ERROR(-1, ex.what());
266 }
267
268 // *** end of code inserted from readConstrainedGranuleArraysAndAggregateDataHook
269
270#if PIPELINING
271 m.put_vector_end();
272 status = true;
273#else
274 set_read_p(true);
275 status = libdap::Array::serialize(eval, dds, m, ce_eval);
276#endif
277 }
278 else {
279 status = libdap::Array::serialize(eval, dds, m, ce_eval);
280 }
281
282 return status;
283}
284
286// Private Impl Below
287
288void ArrayJoinExistingAggregation::duplicate(const ArrayJoinExistingAggregation& rhs)
289{
290 _joinDim = rhs._joinDim;
291}
292
293void ArrayJoinExistingAggregation::cleanup() noexcept
294{
295}
296
297/* virtual */
299{
300 // transfer the constraints from this object into the subArray template
301 // skipping our first dim which is the join dim we need to do specially every
302 // granule in the read hook.
304 *this, // from this
305 true, // skip first dim in the copy since we handle it special
306 true, // also skip it in the toArray for the same reason.
307 true, // print debug
308 DEBUG_CHANNEL); // on this channel
309}
310
312{
313 BES_STOPWATCH_START(DEBUG_CHANNEL, prolog + "Timing");
314
315 // outer one is the first in iteration
316 const Array::dimension& outerDim = *(dim_begin());
317 BESDEBUG("ncml",
318 "Aggregating datasets array with outer dimension constraints: " << " start=" << outerDim.start << " stride=" << outerDim.stride << " stop=" << outerDim.stop << endl);
319
320 try {
321 // assumes the constraints are already set properly on this
322 reserve_value_capacity();
323
324 // Start the iteration state for the granule.
325 const AMDList& datasets = getDatasetList(); // the list
326 NCML_ASSERT(!datasets.empty());
327 int currDatasetIndex = 0; // index into datasets
328 const AggMemberDataset* pCurrDataset = (datasets[currDatasetIndex]).get();
329
330 int outerDimIndexOfCurrDatasetHead = 0;
331 int currDatasetSize = int(pCurrDataset->getCachedDimensionSize(_joinDim.name));
332 bool currDatasetWasRead = false;
333
334 // where in this output array we are writing next
335 unsigned int nextOutputBufferElementIndex = 0;
336
337 // Traverse the outer dimension constraints,
338 // Keeping track of which dataset we need to
339 // be inside for the given values of the constraint.
340 for (int outerDimIndex = outerDim.start; outerDimIndex <= outerDim.stop && outerDimIndex < outerDim.size;
341 outerDimIndex += outerDim.stride) {
342 // Figure out where the given outer index maps into in local granule space
343 int localGranuleIndex = outerDimIndex - outerDimIndexOfCurrDatasetHead;
344
345 // if this is beyond the dataset end, move state to the next dataset
346 // and try again until we're in the proper interval, with proper dataset.
347 while (localGranuleIndex >= currDatasetSize) {
348 localGranuleIndex -= currDatasetSize;
349 outerDimIndexOfCurrDatasetHead += currDatasetSize;
350 ++currDatasetIndex;
351 NCML_ASSERT(currDatasetIndex < int(datasets.size()));
352 pCurrDataset = datasets[currDatasetIndex].get();
353 currDatasetSize = pCurrDataset->getCachedDimensionSize(_joinDim.name);
354 currDatasetWasRead = false;
355
356 BESDEBUG_FUNC(DEBUG_CHANNEL,
357 "The constraint traversal passed a granule boundary " << "on the outer dimension and is stepping forward into " << "granule index=" << currDatasetIndex << endl);
358 }
359
360 // If we haven't read in this granule yet (we passed a boundary)
361 // then do it now. Map constraints into the local granule space.
362 if (!currDatasetWasRead) {
363 BESDEBUG_FUNC(DEBUG_CHANNEL,
364 " Current granule dataset was traversed but not yet " "read and copied into output. Mapping constraints " "and calling read()..." << endl);
365
366 // Set up a constraint object for the actual granule read
367 // so that it only loads the data values in which we are
368 // interested.
369 Array& granuleConstraintTemplate = getGranuleTemplateArray();
370
371 // The inner dim constraints were set up in the containing read() call.
372 // The outer dim was left open for us to fix now...
373 Array::Dim_iter outerDimIt = granuleConstraintTemplate.dim_begin();
374
375 // modify the outerdim size to match the dataset we need to
376 // load. The inners MUST match so we can let those get
377 //checked later...
378 outerDimIt->size = currDatasetSize;
379 outerDimIt->c_size = currDatasetSize; // this will get recalc below?
380
381 // find the mapped endpoint
382 // Basically, the fullspace endpoint mapped to local offset,
383 // clamped into the local granule size.
384
385 // TODO: we will rewrite the following when we need to add the big array support.
386 // Now we will replace the std:min() with our own version to avoid the
387 // inconsistent type comparison. KY 2022-12-28
388#if 0
389 int granuleStopIndex = std::min(outerDim.stop - outerDimIndexOfCurrDatasetHead, (unsigned long long)currDatasetSize - 1);
390#endif
391 int granuleStopIndex = outerDim.stop - outerDimIndexOfCurrDatasetHead;
392 if (granuleStopIndex > (currDatasetSize - 1))
393 granuleStopIndex = currDatasetSize - 1;
394
395 // we must clamp the stride to the interval of the
396 // dataset in order to avoid an exception in
397 // add_constraint on stride being larger than dataset.
398
399 // TODO: we will rewrite the following when we need to add the big array support.
400 // Now we will replace the std:min() with our own version to avoid the
401 // inconsistent type comparison. KY 2022-12-28
402#if 0
403 int clampedStride = std::min(outerDim.stride, (unsigned long long) currDatasetSize);
404#endif
405 int clampedStride = outerDim.stride;
406 if (clampedStride > currDatasetSize)
407 clampedStride = currDatasetSize;
408
409 // mapped endpoint clamped within this granule
410 granuleConstraintTemplate.add_constraint(outerDimIt, localGranuleIndex, clampedStride, granuleStopIndex);
411
412 // Do the constrained read and copy it into this output buffer
413 agg_util::AggregationUtil::addDatasetArrayDataToAggregationOutputArray(*this, // into the output buffer of this object
414 nextOutputBufferElementIndex, // into the next open slice
415 getGranuleTemplateArray(), // constraints we just setup
416 name(), // aggvar name
417 const_cast<AggMemberDataset&>(*pCurrDataset), // Dataset who's DDS should be searched
418 getArrayGetterInterface(), DEBUG_CHANNEL);
419
420 // Jump output buffer index forward by the amount we added.
421 nextOutputBufferElementIndex += getGranuleTemplateArray().length();
422 currDatasetWasRead = true;
423
424 BESDEBUG_FUNC(DEBUG_CHANNEL,
425 " The granule index " << currDatasetIndex << " was read with constraints and copied into the aggregation output." << endl);
426 } // !currDatasetWasRead
427 } // for loop over outerDim
428 } // try
429
430 catch (AggregationException& ex) {
431 THROW_NCML_PARSE_ERROR(-1, ex.what());
432 }
433
434}
435
436} // namespace agg_util
virtual unsigned int getCachedDimensionSize(const std::string &dimName) const =0
virtual unsigned int getCachedDimensionSize(const std::string &dimName) const =0
static void addDatasetArrayDataToAggregationOutputArray(libdap::Array &oOutputArray, unsigned int atIndex, const libdap::Array &constrainedTemplateArray, const string &varName, AggMemberDataset &dataset, const ArrayGetterInterface &arrayGetter, const string &debugChannel)
static void transferArrayConstraints(libdap::Array *pToArray, const libdap::Array &fromArray, bool skipFirstFromDim, bool skipFirstToDim, bool printDebug=false, const std::string &debugChannel="agg_util")
static void printDimensions(std::ostream &os, const libdap::Array &fromArray)
static libdap::Array * readDatasetArrayDataForAggregation(const libdap::Array &constrainedTemplateArray, const std::string &varName, AggMemberDataset &dataset, const ArrayGetterInterface &arrayGetter, const std::string &debugChannel)
const AMDList & getDatasetList() const
const ArrayGetterInterface & getArrayGetterInterface() const
ArrayAggregationBase(const libdap::Array &granuleProto, AMDList memberDatasets, std::unique_ptr< ArrayGetterInterface > arrayGetter)
virtual ArrayJoinExistingAggregation * ptr_duplicate()
ArrayJoinExistingAggregation(const libdap::Array &granuleTemplate, AMDList memberDatasets, std::unique_ptr< ArrayGetterInterface > arrayGetter, const Dimension &joinDim)
STL class.
Helper class for temporarily hijacking an existing dhi to load a DDX response for one particular file...
std::string toString() const
Definition Dimension.cc:45