bes Updated for version 3.21.1
The Backend Server (BES) is the lower two tiers of the Hyrax data server
GlobalMetadataStore.cc
1// -*- mode: c++; c-basic-offset:4 -*-
2
3// This file is part of HYrax, A C++ implementation of the OPeNDAP Data
4// Access Protocol.
5
6// Copyright (c) 2018 OPeNDAP, Inc.
7// Author: James Gallagher <jgallagher@opendap.org>
8//
9// This library is free software; you can redistribute it and/or
10// modify it under the terms of the GNU Lesser General Public
11// License as published by the Free Software Foundation; either
12// version 2.1 of the License, or (at your option) any later version.
13//
14// This library is distributed in the hope that it will be useful,
15// but WITHOUT ANY WARRANTY; without even the implied warranty of
16// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
17// Lesser General Public License for more details.
18//
19// You should have received a copy of the GNU Lesser General Public
20// License along with this library; if not, write to the Free Software
21// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
22//
23// You can contact OPeNDAP, Inc. at PO Box 112, Saunderstown, RI. 02874-0112.
24
25#include "config.h"
26
27#include <fcntl.h> // for posix_advise
28#include <unistd.h>
29
30#include <cerrno>
31#include <cstring>
32
33#include <iostream>
34#include <string>
35#include <fstream>
36#include <sstream>
37#include <functional>
38#include <libdap/DAS.h>
39#include <memory>
40#include <sys/stat.h>
41
42#include <libdap/DapObj.h>
43#include <libdap/DDS.h>
44#include <libdap/DMR.h>
45#include <libdap/D4ParserSax2.h>
46#include <libdap/XMLWriter.h>
47#include <libdap/BaseTypeFactory.h>
48#include <libdap/D4BaseTypeFactory.h>
49
50#include "PicoSHA2/picosha2.h"
51
52#include "TempFile.h"
53#include "TheBESKeys.h"
54#include "BESUtil.h"
55#include "BESLog.h"
56#include "BESContextManager.h"
57#include "BESDebug.h"
58#include "BESRequestHandler.h"
59#include "BESRequestHandlerList.h"
60#include "BESNotFoundError.h"
61
62#include "BESInternalError.h"
63#include "BESInternalFatalError.h"
64
65#include "GlobalMetadataStore.h"
66
67#define DEBUG_KEY "metadata_store"
68#define MAINTAIN_STORE_SIZE_EVEN_WHEN_UNLIMITED 0
69
70#ifdef HAVE_ATEXIT
71#define AT_EXIT(x) atexit((x))
72#else
73#define AT_EXIT(x)
74#endif
75
85#undef SYMMETRIC_ADD_RESPONSES
86
87#define prolog std::string("GlobalMetadataStore::").append(__func__).append("() - ")
88
89using namespace std;
90using namespace libdap;
91using namespace bes;
92
93static const unsigned int default_cache_size = 20; // 20 GB
94static const string default_cache_prefix = "mds";
95static const string default_cache_dir = ""; // I'm making the default empty so that no key == no caching. jhrg 9.26.16
96static const string default_ledger_name = "mds_ledger.txt";
97
98static const string PATH_KEY = "DAP.GlobalMetadataStore.path";
99static const string PREFIX_KEY = "DAP.GlobalMetadataStore.prefix";
100static const string SIZE_KEY = "DAP.GlobalMetadataStore.size";
101static const string LEDGER_KEY = "DAP.GlobalMetadataStore.ledger";
102static const string LOCAL_TIME_KEY = "BES.LogTimeLocal";
103
104GlobalMetadataStore *GlobalMetadataStore::d_instance = 0;
105bool GlobalMetadataStore::d_enabled = true;
106
121void GlobalMetadataStore::transfer_bytes(int fd, ostream &os)
122{
123 static const int BUFFER_SIZE = 16*1024;
124
125#if _POSIX_C_SOURCE >= 200112L
126 /* Advise the kernel of our access pattern. */
127 int status = posix_fadvise(fd, 0, 0, POSIX_FADV_SEQUENTIAL);
128 if (status != 0)
129 ERROR_LOG(prolog + "Error calling posix_advise() in the GlobalMetadataStore: " + strerror(status));
130#endif
131
132 char buf[BUFFER_SIZE + 1];
133
134 while(int bytes_read = read(fd, buf, BUFFER_SIZE))
135 {
136 if(bytes_read == -1)
137 throw BESInternalError("Could not read dds from the metadata store.", __FILE__, __LINE__);
138 if (!bytes_read)
139 break;
140
141 os.write(buf, bytes_read);
142 }
143}
144
157void GlobalMetadataStore::insert_xml_base(int fd, ostream &os, const string &xml_base)
158{
159 static const int BUFFER_SIZE = 1024;
160
161#if _POSIX_C_SOURCE >= 200112L
162 /* Advise the kernel of our access pattern. */
163 int status = posix_fadvise(fd, 0, 0, POSIX_FADV_SEQUENTIAL);
164 if (status != 0)
165 ERROR_LOG(prolog + "Error calling posix_advise() in the GlobalMetadataStore: " + strerror(status));
166#endif
167
168 char buf[BUFFER_SIZE + 1];
169 size_t bytes_read = read(fd, buf, BUFFER_SIZE);
170
171 if(bytes_read == (size_t)-1)
172 throw BESInternalError("Could not read dds from the metadata store.", __FILE__, __LINE__);
173
174 if (bytes_read == 0)
175 return;
176
177 // Every valid DMR/++ response in the MDS starts with:
178 // <?xml version="1.0" encoding="ISO‌-8859-1"?>
179 //
180 // and has one of two kinds of <Dataset...> tags
181 // 1: <Dataset xmlns="..." xml:base="file:DMR_1.xml" ... >
182 // 2: <Dataset xmlns="..." ... >
183 //
184 // Assume it is well formed and always includes the prolog,
185 // but might not use <CR> <CRLF> chars
186
187 // transfer the prolog (<?xml version="1.0" encoding="ISO‌-8859-1"?>)
188 size_t i = 0;
189 while (buf[i++] != '>')
190 ; // 'i' now points one char past the xml prolog
191 os.write(buf, i);
192
193 // transfer <Dataset ...> with new value for xml:base
194 size_t s = i; // start of <Dataset ...>
195 size_t j = 0;
196 char xml_base_literal[] = "xml:base";
197 while (i < bytes_read) {
198 if (buf[i] == '>') { // Found end of Dataset; no xml:base was present
199 os.write(buf + s, i - s);
200 os << " xml:base=\"" << xml_base << "\"";
201 break;
202 }
203 else if (j == sizeof(xml_base_literal) - 1) { // found 'xml:base' literal
204 os.write(buf + s, i - s); // This will include all of <Dataset... including 'xml:base'
205 while (buf[i++] != '=')
206 ; // read/discard '="..."'
207 while (buf[i++] != '"')
208 ;
209 while (buf[i++] != '"')
210 ;
211 os << "=\"" << xml_base << "\""; // write the new xml:base value
212 break;
213 }
214 else if (buf[i] == xml_base_literal[j]) {
215 ++j;
216 }
217 else {
218 j = 0;
219 }
220
221 ++i;
222 }
223
224 // transfer the rest
225 os.write(buf + i, bytes_read - i);
226
227 // Now, if the response is more than 1k, use faster code to finish the tx
228 transfer_bytes(fd, os);
229}
230
231unsigned long GlobalMetadataStore::get_cache_size_from_config()
232{
233 bool found;
234 string size;
235 unsigned long size_in_megabytes = default_cache_size;
236 TheBESKeys::TheKeys()->get_value(SIZE_KEY, size, found);
237 if (found) {
238 BESDEBUG(DEBUG_KEY,
239 "GlobalMetadataStore::getCacheSizeFromConfig(): Located BES key " << SIZE_KEY << "=" << size << endl);
240 istringstream iss(size);
241 iss >> size_in_megabytes;
242 }
243
244 return size_in_megabytes;
245}
246
247string GlobalMetadataStore::get_cache_prefix_from_config()
248{
249 bool found;
250 string prefix = default_cache_prefix;
251 TheBESKeys::TheKeys()->get_value(PREFIX_KEY, prefix, found);
252 if (found) {
253 BESDEBUG(DEBUG_KEY,
254 "GlobalMetadataStore::getCachePrefixFromConfig(): Located BES key " << PREFIX_KEY << "=" << prefix << endl);
255 prefix = BESUtil::lowercase(prefix);
256 }
257
258 return prefix;
259}
260
261// If the cache prefix is the empty string, the cache is turned off.
262string GlobalMetadataStore::get_cache_dir_from_config()
263{
264 bool found;
265
266 string cacheDir = default_cache_dir;
267 TheBESKeys::TheKeys()->get_value(PATH_KEY, cacheDir, found);
268 if (found) {
269 BESDEBUG(DEBUG_KEY,
270 "GlobalMetadataStore::getCacheDirFromConfig(): Located BES key " << PATH_KEY<< "=" << cacheDir << endl);
271 }
272
273 return cacheDir;
274}
275
290
308GlobalMetadataStore::get_instance(const string &cache_dir, const string &prefix, unsigned long long size)
309{
310 if (d_enabled && d_instance == 0) {
311 d_instance = new GlobalMetadataStore(cache_dir, prefix, size); // never returns null_ptr
312 d_enabled = d_instance->cache_enabled();
313 if (!d_enabled) {
314 delete d_instance;
315 d_instance = 0;
316
317 BESDEBUG(DEBUG_KEY, "GlobalMetadataStore::"<<__func__ << "() - " << "MDS is DISABLED"<< endl);
318 }
319 else {
320 AT_EXIT(delete_instance);
321
322 BESDEBUG(DEBUG_KEY, "GlobalMetadataStore::"<<__func__ << "() - " << "MDS is ENABLED"<< endl);
323 }
324 }
325
326 BESDEBUG(DEBUG_KEY, "GlobalMetadataStore::get_instance(dir,prefix,size) - d_instance: " << d_instance << endl);
327
328 return d_instance;
329}
330
339{
340 if (d_enabled && d_instance == 0) {
341 d_instance = new GlobalMetadataStore(get_cache_dir_from_config(), get_cache_prefix_from_config(),
342 get_cache_size_from_config());
343 d_enabled = d_instance->cache_enabled();
344 if (!d_enabled) {
345 delete d_instance;
346 d_instance = nullptr;
347 BESDEBUG(DEBUG_KEY, "GlobalMetadataStore::"<<__func__ << "() - " << "MDS is DISABLED"<< endl);
348 }
349 else {
350 AT_EXIT(delete_instance);
351
352 BESDEBUG(DEBUG_KEY, "GlobalMetadataStore::"<<__func__ << "() - " << "MDS is ENABLED"<< endl);
353 }
354 }
355
356 BESDEBUG(DEBUG_KEY, "GlobalMetadataStore::get_instance() - d_instance: " << (void *) d_instance << endl);
357
358 return d_instance;
359}
360
361
365void
367{
368 bool found;
369
370 TheBESKeys::TheKeys()->get_value(LEDGER_KEY, d_ledger_name, found);
371 if (found) {
372 BESDEBUG(DEBUG_KEY, "Located BES key " << LEDGER_KEY << "=" << d_ledger_name << endl);
373 }
374 else {
375 d_ledger_name = default_ledger_name;
376 }
377
378 ofstream of(d_ledger_name.c_str(), ios::app);
379
380 // By default, use UTC in the logs.
381 string local_time = "no";
382 TheBESKeys::TheKeys()->get_value(LOCAL_TIME_KEY, local_time, found);
383 d_use_local_time = (local_time == "YES" || local_time == "Yes" || local_time == "yes");
384}
385
402 : BESFileLockingCache(get_cache_dir_from_config(), get_cache_prefix_from_config(), get_cache_size_from_config())
403{
404 initialize();
405}
406
407GlobalMetadataStore::GlobalMetadataStore(const string &cache_dir, const string &prefix,
408 unsigned long long size) : BESFileLockingCache(cache_dir, prefix, size)
409{
410 initialize();
411}
413
418static void dump_time(ostream &os, bool use_local_time)
419{
420 time_t now;
421 time(&now);
422 char buf[sizeof "YYYY-MM-DDTHH:MM:SSzone"];
423 int status = 0;
424
425 // From StackOverflow:
426 // This will work too, if your compiler doesn't support %F or %T:
427 // strftime(buf, sizeof buf, "%Y-%m-%dT%H:%M:%S%Z", gmtime(&now));
428 //
429 // Apologies for the twisted logic - UTC is the default. Override to
430 // local time using BES.LogTimeLocal=yes in bes.conf. jhrg 11/15/17
431 struct tm result{};
432 if (!use_local_time) {
433 gmtime_r(&now, &result);
434 status = strftime(buf, sizeof buf, "%FT%T%Z", &result);
435 }
436 else {
437 localtime_r(&now, &result);
438 status = strftime(buf, sizeof buf, "%FT%T%Z", &result);
439 }
440 if (!status)
441 ERROR_LOG(prolog + "Error getting time for Metadata Store ledger.");
442
443 os << buf;
444}
445
449void
451{
452 // open just once, <- done SBL 11.7.19
453
454 int fd; // value-result parameter;
455 if (get_exclusive_lock(d_ledger_name, fd)) {
456 BESDEBUG(DEBUG_KEY, __FUNCTION__ << " Ledger " << d_ledger_name << " write locked." << endl);
457 if (of) {
458 try {
459 dump_time(of, d_use_local_time);
460 of << " " << d_ledger_entry << endl;
461 VERBOSE("MDS Ledger name: '" + d_ledger_name + "', entry: '" + d_ledger_entry + "'.");
462 unlock_and_close(d_ledger_name); // closes fd
463 }
464 catch (...) {
465 unlock_and_close(d_ledger_name);
466 throw;
467 }
468 }
469 else {
470 ERROR_LOG(prolog + "Warning: Metadata store could not write to its ledger file.");
471 unlock_and_close(d_ledger_name);
472 }
473 }
474 else {
475 throw BESInternalError("Could not write lock '" + d_ledger_name, __FILE__, __LINE__);
476 }
477}
478
485string
487{
488 if (name.empty())
489 throw BESInternalError("Empty name passed to the Metadata Store.", __FILE__, __LINE__);
490
491 return picosha2::hash256_hex_string(name[0] == '/' ? name : "/" + name);
492}
493
511{
512 if (d_dds) {
513 D4BaseTypeFactory factory;
514 DMR dmr(&factory, *d_dds);
515 XMLWriter xml;
516 dmr.print_dap4(xml);
517 os << xml.get_doc();
518 }
519 else if (d_dmr) {
520 if (d_dmr->get_utf8_xml_encoding()) {
521 auto xml = XMLWriter(" ","UTF-8");
522 d_dmr->print_dap4(xml);
523 os << xml.get_doc();
524 }
525 else {
526 XMLWriter xml;
527 d_dmr->print_dap4(xml);
528 os << xml.get_doc();
529 }
530 }
531 else {
532 throw BESInternalFatalError("Unknown DAP object type.", __FILE__, __LINE__);
533 }
534}
535
538 if (d_dds)
539 d_dds->print(os);
540 else if (d_dmr)
541 d_dmr->getDDS()->print(os);
542 else
543 throw BESInternalFatalError("Unknown DAP object type.", __FILE__, __LINE__);
544}
545
548 if (d_dds)
549 d_dds->print_das(os);
550 else if (d_dmr)
551 d_dmr->getDDS()->print_das(os);
552 else
553 throw BESInternalFatalError("Unknown DAP object type.", __FILE__, __LINE__);
554}
555
570bool
571GlobalMetadataStore::store_dap_response(StreamDAP &writer, const string &key, const string &name,
572 const string &response_name)
573{
574 BESDEBUG(DEBUG_KEY, __FUNCTION__ << " BEGIN " << key << endl);
575
576 string item_name = get_cache_file_name(key, false /*mangle*/);
577
578 int fd;
579 if (create_and_lock(item_name, fd)) {
580 BESDEBUG(DEBUG_KEY,__FUNCTION__ << " Storing " << item_name << endl);
581
582 // Get an output stream directed at the locked cache file
583 ofstream response(item_name.c_str(), ios::out|ios::app);
584 if (!response.is_open())
585 throw BESInternalError("Could not open '" + key + "' to write the response.", __FILE__, __LINE__);
586
587 try {
588 // for the different writers, look at the StreamDAP struct in the class
589 // definition. jhrg 2.27.18
590 writer(response); // different writers can write the DDS, DAS or DMR
591
592 // Compute/update/maintain the cache size? This is extra work
593 // that might never be used. It also locks the cache...
594 if (!is_unlimited() || MAINTAIN_STORE_SIZE_EVEN_WHEN_UNLIMITED) {
595 // This enables the call to update_cache_info() below.
597
598 unsigned long long size = update_cache_info(item_name);
599 if (!is_unlimited() && cache_too_big(size)) update_and_purge(item_name);
600 }
601
602 unlock_and_close(item_name);
603 }
604 catch (...) {
605 // Bummer. There was a problem doing The Stuff. Now we gotta clean up.
606 response.close();
607 this->purge_file(item_name);
608 unlock_and_close(item_name);
609 throw;
610 }
611
612 VERBOSE("Metadata store: Wrote " + response_name + " response for '" + name + "'.");
613 d_ledger_entry.append(" ").append(key);
614
615 return true;
616 }
617 else if (get_read_lock(item_name, fd)) {
618 // We found the key; didn't add this because it was already here
619 BESDEBUG(DEBUG_KEY,__FUNCTION__ << " Found " << item_name << " in the store already." << endl);
620 unlock_and_close(item_name);
621
622 ERROR_LOG(prolog + "Metadata store: unable to store the " + response_name + " response for '" + name + "'.");
623
624 return false;
625 }
626 else {
627 throw BESInternalError("Could neither create or open '" + item_name + "' in the metadata store.", __FILE__, __LINE__);
628 }
629}
630
645
657bool
658GlobalMetadataStore::add_responses(DDS *dds, const string &name)
659{
660 // Start the index entry
661 d_ledger_entry = string("add DDS ").append(name);
662
663 // I'm appending the 'dds r' string to the name before hashing so that
664 // the different hashes for the file's DDS, DAS, ..., are all very different.
665 // This will be useful if we use S3 instead of EFS for the Metadata Store.
666 //
667 // The helper() also updates the ledger string.
668 StreamDDS write_the_dds_response(dds);
669 bool stored_dds = store_dap_response(write_the_dds_response, get_hash(name + "dds_r"), name, "DDS");
670
671 StreamDAS write_the_das_response(dds);
672 bool stored_das = store_dap_response(write_the_das_response, get_hash(name + "das_r"), name, "DAS");
673
674#if SYMMETRIC_ADD_RESPONSES
675 StreamDMR write_the_dmr_response(dds);
676 bool stored_dmr = store_dap_response(write_the_dmr_response, get_hash(name + "dmr_r"), name, "DMR");
677#endif
678
679 write_ledger(); // write the index line
680
681#if SYMMETRIC_ADD_RESPONSES
682 return (stored_dds && stored_das && stored_dmr);
683#else
684 return (stored_dds && stored_das);
685#endif
686}
687
699bool
700GlobalMetadataStore::add_responses(DMR *dmr, const string &name)
701{
702 // Start the index entry
703 d_ledger_entry = string("add DMR ").append(name);
704
705 // I'm appending the 'dds r' string to the name before hashing so that
706 // the different hashes for the file's DDS, DAS, ..., are all very different.
707 // This will be useful if we use S3 instead of EFS for the Metadata Store.
708 //
709 // The helper() also updates the ledger string.
710#if SYMMETRIC_ADD_RESPONSES
711 StreamDDS write_the_dds_response(dmr);
712 bool stored_dds = store_dap_response(write_the_dds_response, get_hash(name + "dds_r"), name, "DDS");
713
714 StreamDAS write_the_das_response(dmr);
715 bool stored_das = store_dap_response(write_the_das_response, get_hash(name + "das_r"), name, "DAS");
716#endif
717
718 StreamDMR write_the_dmr_response(dmr);
719 bool stored_dmr = store_dap_response(write_the_dmr_response, get_hash(name + "dmr_r"), name, "DMR");
720
721 write_ledger(); // write the index line
722
723#if SYMMETRIC_ADD_RESPONSES
724 return (stored_dds && stored_das && stored_dmr);
725#else
726 return(stored_dmr /* && stored_dmrpp */);
727#endif
728}
729
730
744GlobalMetadataStore::get_read_lock_helper(const string &name, const string &suffix, const string &object_name)
745{
746 BESDEBUG(DEBUG_KEY, __func__ << "() MDS hashing name '" << name << "', '" << suffix << "'"<< endl);
747
748 if (name.empty())
749 throw BESInternalError("An empty name string was received by "
750 "GlobalMetadataStore::get_read_lock_helper(). That should never happen.", __FILE__, __LINE__);
751
752 string item_name = get_cache_file_name(get_hash(name + suffix), false);
753 int fd;
754 MDSReadLock lock(item_name, get_read_lock(item_name, fd), this);
755 BESDEBUG(DEBUG_KEY, __func__ << "() MDS lock for " << item_name << ": " << lock() << endl);
756
757 string hit_or_miss="miss";
758 if (lock()) {
759 hit_or_miss = "hit";
760 }
761 INFO_LOG(prolog + "MDS Cache " + hit_or_miss + " for '" + name + "' and response " + object_name);
762
763 return lock;
764 }
765
786{
787 return get_read_lock_helper(name,"dmr_r","DMR");
788}//end is_dmr_available(string)
789
792{
793 //call get_read_lock_helper
794 MDSReadLock lock = get_read_lock_helper(container.get_relative_name(), "dmr_r", "DMR");
795 if (lock()){
796
797 bool reload = is_available_helper(container.get_real_name(), container.get_relative_name(), container.get_container_type(), "dmr_r");
798
799 if(reload){
800 lock.clearLock();
801 return lock;
802 }//end if
803 else{
804 return lock;
805 }//end else
806
807 }//end if(is locked)
808 else{
809 return lock;
810 }//end else
811
812}//end is_dmr_available(BESContainer)
813
814GlobalMetadataStore::MDSReadLock
815GlobalMetadataStore::is_dmr_available(const std::string &realName, const std::string &relativeName, const std::string &fileType)
816{
817 //call get_read_lock_helper
818 MDSReadLock lock = get_read_lock_helper(relativeName,"dmr_r","DMR");
819 if (lock()){
820
821 bool reload = is_available_helper(realName, relativeName, fileType, "dmr_r");
822
823 if(reload){
824 lock.clearLock();
825 return lock;
826 }//end if
827 else{
828 return lock;
829 }//end else
830
831 }//end if(is locked)
832 else{
833 return lock;
834 }//end else
835
836}//end is_dmr_available(string, string, string)
837
845GlobalMetadataStore::MDSReadLock
847{
848 return get_read_lock_helper(name,"dds_r","DDS");
849}//end is_dds_available(string)
850
860
863{
864 //call get_read_lock_helper
865 MDSReadLock lock = get_read_lock_helper(container.get_relative_name(),"dds_r","DDS");
866 if (lock()){
867
868 bool reload = is_available_helper(container.get_real_name(), container.get_relative_name(), container.get_container_type(), "dds_r");
869
870 if(reload){
871 lock.clearLock();
872 return lock;
873 }//end if
874 else{
875 return lock;
876 }//end else
877
878 }//end if(is locked)
879 else{
880 return lock;
881 }//end else
882
883}//end is_dds_available(BESContainer)
884
894{
895 return get_read_lock_helper(name,"das_r","DAS");
896}//end is_das_available(string)
897
900{
901 //return get_read_lock_helper(name, "das_r", "DAS");
902 //call get_read_lock_helper
903 MDSReadLock lock = get_read_lock_helper(container.get_relative_name(),"das_r","DAS");
904 if (lock()){
905
906 bool reload = is_available_helper(container.get_real_name(), container.get_relative_name(), container.get_container_type(), "das_r");
907
908 if(reload){
909 lock.clearLock();
910 return lock;
911 }//end if
912 else{
913 return lock;
914 }//end else
915
916 }//end if(is locked)
917 else{
918 return lock;
919 }//end else
920
921}//end is_das_available(BESContainer)
922
941GlobalMetadataStore::MDSReadLock
943{
944 return get_read_lock_helper(name,"dmrpp_r","DMR++");
945}//end is_dmrpp_available(string)
946
949{
950 //return get_read_lock_helper(name, "dmrpp_r", "DMR++");
951 //call get_read_lock_helper
952 MDSReadLock lock = get_read_lock_helper(container.get_relative_name(),"dmrpp_r","DMR++");
953 if (lock()){
954
955 bool reload = is_available_helper(container.get_real_name(), container.get_relative_name(), container.get_container_type(), "dmrpp_r");
956
957 if(reload){
958 lock.clearLock();
959 return lock;
960 }//end if
961 else{
962 return lock;
963 }//end else
964
965 }//end if(is locked)
966 else{
967 return lock;
968 }//end else
969
970}//end is_dmrpp_available(BESContainer)
971
982bool
983GlobalMetadataStore::is_available_helper(const string &realName, const string &relativeName, const string &fileType, const string &suffix)
984{
985 //use type with find_handler() to get handler
986 BESRequestHandler *besRH = BESRequestHandlerList::TheList()->find_handler(fileType);
987
988 //use handler.get_lmt()
989 time_t file_time = besRH->get_lmt(realName);
990
991 //get the cache time of the handler
992 time_t cache_time = get_cache_lmt(relativeName, suffix);
993
994 //compare file lmt and time of creation of cache
995 if (file_time > cache_time){
996 return true;
997 }//end if(file > cache)
998 else {
999 return false;
1000 }//end else
1001}
1002
1010time_t
1011GlobalMetadataStore::get_cache_lmt(const string &fileName, const string &suffix)
1012{
1013 string item_name = get_cache_file_name(get_hash(fileName + suffix), false);
1014 struct stat statbuf;
1015
1016 if (stat(item_name.c_str(), &statbuf) == -1){
1017 throw BESNotFoundError(strerror(errno), __FILE__, __LINE__);
1018 }//end if(error)
1019
1020 return statbuf.st_mtime;
1021}//end get_cache_lmt()
1022
1025
1033void
1034GlobalMetadataStore::write_response_helper(const string &name, ostream &os, const string &suffix, const string &object_name)
1035{
1036 string item_name = get_cache_file_name(get_hash(name + suffix), false);
1037 int fd; // value-result parameter;
1038 if (get_read_lock(item_name, fd)) {
1039 VERBOSE("Metadata store: Cache hit: read " + object_name + " response for '" + name + "'.");
1040 BESDEBUG(DEBUG_KEY, __FUNCTION__ << " Found " << item_name << " in the store." << endl);
1041 try {
1042 transfer_bytes(fd, os);
1043 unlock_and_close(item_name); // closes fd
1044 }
1045 catch (...) {
1046 unlock_and_close(item_name);
1047 throw;
1048 }
1049 }
1050 else {
1051 throw BESInternalError("Could not open '" + item_name + "' in the metadata store.", __FILE__, __LINE__);
1052 }
1053}
1054
1063void
1064GlobalMetadataStore::write_response_helper(const string &name, ostream &os, const string &suffix, const string &xml_base,
1065 const string &object_name)
1066{
1067 string item_name = get_cache_file_name(get_hash(name + suffix), false);
1068 int fd; // value-result parameter;
1069 if (get_read_lock(item_name, fd)) {
1070 VERBOSE("Metadata store: Cache hit: read " + object_name + " response for '" + name + "'.");
1071 BESDEBUG(DEBUG_KEY, __FUNCTION__ << " Found " << item_name << " in the store." << endl);
1072 try {
1073 insert_xml_base(fd, os, xml_base);
1074
1075 transfer_bytes(fd, os);
1076 unlock_and_close(item_name); // closes fd
1077 }
1078 catch (...) {
1079 unlock_and_close(item_name);
1080 throw;
1081 }
1082 }
1083 else {
1084 throw BESInternalError("Could not open '" + item_name + "' in the metadata store.", __FILE__, __LINE__);
1085 }
1086}
1087
1088
1095void
1096GlobalMetadataStore::write_dds_response(const std::string &name, ostream &os)
1097{
1098 write_response_helper(name, os, "dds_r", "DDS");
1099}
1100
1107void
1108GlobalMetadataStore::write_das_response(const std::string &name, ostream &os)
1109{
1110 write_response_helper(name, os, "das_r", "DAS");
1111}
1112
1119void
1120GlobalMetadataStore::write_dmr_response(const std::string &name, ostream &os)
1121{
1122 bool found = false;
1123 string xml_base = BESContextManager::TheManager()->get_context("xml:base", found);
1124 if (!found) {
1125#if XML_BASE_MISSING_MEANS_OMIT_ATTRIBUTE
1126 write_response_helper(name, os, "dmr_r", "DMR");
1127#else
1128 throw BESInternalError("Could not read the value of xml:base.", __FILE__, __LINE__);
1129#endif
1130 }
1131 else {
1132 write_response_helper(name, os, "dmr_r", xml_base, "DMR");
1133 }
1134}
1135
1142void
1143GlobalMetadataStore::write_dmrpp_response(const std::string &name, ostream &os)
1144{
1145 bool found = false;
1146 string xml_base = BESContextManager::TheManager()->get_context("xml:base", found);
1147 if (!found) {
1148#if XML_BASE_MISSING_MEANS_OMIT_ATTRIBUTE
1149 write_response_helper(name, os, "dmrpp_r", "DMR++");
1150#else
1151 throw BESInternalError("Could not read the value of xml:base.", __FILE__, __LINE__);
1152#endif
1153 }
1154 else {
1155 write_response_helper(name, os, "dmrpp_r", xml_base, "DMR++");
1156 }
1157}
1158
1166bool
1167GlobalMetadataStore::remove_response_helper(const string& name, const string &suffix, const string &object_name)
1168{
1169 string hash = get_hash(name + suffix);
1170 if (unlink(get_cache_file_name(hash, false).c_str()) == 0) {
1171 VERBOSE("Metadata store: Removed " + object_name + " response for '" + hash + "'.");
1172 d_ledger_entry.append(" ").append(hash);
1173 return true;
1174 }
1175 else {
1176 ERROR_LOG(prolog + "Metadata store: unable to remove the " + object_name + " response for '" + name
1177 + "' (" + strerror(errno) + ").");
1178 }
1179
1180 return false;
1181}
1182
1189bool
1191{
1192 // Start the index entry
1193 d_ledger_entry = string("remove ").append(name);
1194
1195 bool removed_dds = remove_response_helper(name, "dds_r", "DDS");
1196
1197 bool removed_das = remove_response_helper(name, "das_r", "DAS");
1198
1199 bool removed_dmr = remove_response_helper(name, "dmr_r", "DMR");
1200
1201 bool removed_dmrpp = remove_response_helper(name, "dmrpp_r", "DMR++");
1202
1203 write_ledger(); // write the index line
1204
1205#if SYMMETRIC_ADD_RESPONSES
1206 return (removed_dds && removed_das && removed_dmr);
1207#else
1208 return (removed_dds || removed_das || removed_dmr || removed_dmrpp);
1209#endif
1210}
1211
1224DMR *
1226{
1227 stringstream oss;
1228 write_dmr_response(name, oss); // throws BESInternalError if not found
1229
1230 D4BaseTypeFactory d4_btf;
1231 unique_ptr<DMR> dmr(new DMR(&d4_btf, "mds"));
1232
1233 D4ParserSax2 parser;
1234 parser.intern(oss.str(), dmr.get());
1235
1236 dmr->set_factory(0);
1237
1238 return dmr.release();
1239}
1240
1262DDS *
1264{
1265 TempFile dds_tmp;
1266 string dds_tmp_name = dds_tmp.create(get_cache_directory(), "mds_dds");
1267
1268 fstream dds_fs(dds_tmp_name.c_str(), std::fstream::out);
1269 try {
1270 write_dds_response(name, dds_fs); // throws BESInternalError if not found
1271 dds_fs.close();
1272 }
1273 catch (...) {
1274 dds_fs.close();
1275 throw;
1276 }
1277
1278 BaseTypeFactory btf;
1279 unique_ptr<DDS> dds(new DDS(&btf));
1280 dds->parse(dds_tmp_name);
1281
1282 TempFile das_tmp;
1283 string das_tmp_name = das_tmp.create(get_cache_directory(), "mds_das");
1284 fstream das_fs(das_tmp_name.c_str(), std::fstream::out);
1285 try {
1286 write_das_response(name, das_fs); // throws BESInternalError if not found
1287 das_fs.close();
1288 }
1289 catch (...) {
1290 das_fs.close();
1291 throw;
1292 }
1293
1294 unique_ptr<DAS> das(new DAS());
1295 das->parse(das_tmp_name);
1296
1297 dds->transfer_attributes(das.get());
1298 dds->set_factory(nullptr);
1299
1300 return dds.release();
1301}
1302
1303
1304void
1305GlobalMetadataStore::parse_das_from_mds(libdap::DAS* das, const std::string &name) {
1306 string suffix = "das_r";
1307 string item_name = get_cache_file_name(get_hash(name + suffix), false);
1308 int fd; // value-result parameter;
1309 if (get_read_lock(item_name, fd)) {
1310 VERBOSE("Metadata store: Cache hit: read response for '" + name + "'.");
1311 BESDEBUG(DEBUG_KEY, __FUNCTION__ << " Found " << item_name << " in the store." << endl);
1312 try {
1313 // Just generate the DAS by parsing from the file
1314 das->parse(item_name);
1315 unlock_and_close(item_name); // closes fd
1316 }
1317 catch (...) {
1318 unlock_and_close(item_name);
1319 throw;
1320 }
1321 }
1322 else {
1323 throw BESInternalError("Could not open '" + item_name + "' in the metadata store.", __FILE__, __LINE__);
1324 }
1325
1326}
1327
A container is something that holds data. E.G., a netcdf file or a database entry.
std::string get_relative_name() const
Get the relative name of the object in this container.
std::string get_container_type() const
retrieve the type of data this container holds, such as cedar or netcdf.
std::string get_real_name() const
retrieve the real name for this container, such as a file name.
Implementation of a caching mechanism for compressed data.
virtual void unlock_and_close(const std::string &target)
std::string get_cache_directory() const
bool is_unlimited() const
Is this cache allowed to store as much as it wants?
virtual unsigned long long update_cache_info(const std::string &target)
Update the cache info file to include 'target'.
virtual bool create_and_lock(const std::string &target, int &fd)
Create a file in the cache and lock it for write access.
virtual void exclusive_to_shared_lock(int fd)
Transfer from an exclusive lock to a shared lock.
virtual bool get_read_lock(const std::string &target, int &fd)
Get a read-only lock on the file if it exists.
virtual bool get_exclusive_lock(const std::string &target, int &fd)
virtual void purge_file(const std::string &file)
Purge a single file from the cache.
virtual bool cache_too_big(unsigned long long current_size) const
look at the cache size; is it too large? Look at the cache size and see if it is too big.
virtual void update_and_purge(const std::string &new_file)
Purge files from the cache.
virtual std::string get_cache_file_name(const std::string &src, bool mangle=true)
exception thrown if internal error encountered
exception thrown if an internal error is found and is fatal to the BES
error thrown if the resource requested cannot be found
Represents a specific data type request handler.
virtual time_t get_lmt(const std::string &name)
Get the Last modified time for.
static std::string lowercase(const std::string &s)
Definition BESUtil.cc:257
void get_value(const std::string &s, std::string &val, bool &found)
Retrieve the value of a given key, if set.
static TheBESKeys * TheKeys()
Access to the singleton.
Definition TheBESKeys.cc:85
Store the DAP metadata responses.
virtual libdap::DDS * get_dds_object(const std::string &name)
Build a DDS object from the cached Response.
virtual void write_dmr_response(const std::string &name, std::ostream &os)
Write the stored DMR response to a stream.
void write_response_helper(const std::string &name, std::ostream &os, const std::string &suffix, const std::string &object_name)
std::string get_hash(const std::string &name)
static void transfer_bytes(int fd, std::ostream &os)
virtual bool remove_responses(const std::string &name)
Remove all cached responses and objects for a granule.
virtual void write_dds_response(const std::string &name, std::ostream &os)
Write the stored DDS response to a stream.
void initialize()
Configure the ledger using LEDGER_KEY and LOCAL_TIME_KEY.
static GlobalMetadataStore * get_instance()
bool store_dap_response(StreamDAP &writer, const std::string &key, const std::string &name, const std::string &response_name)
static void insert_xml_base(int fd, std::ostream &os, const std::string &xml_base)
like transfer_bytes(), but adds the xml:base attribute to the DMR/++
virtual void write_das_response(const std::string &name, std::ostream &os)
Write the stored DAS response to a stream.
bool remove_response_helper(const std::string &name, const std::string &suffix, const std::string &object_name)
virtual MDSReadLock is_dmrpp_available(const std::string &name)
Is the DMR++ response for.
virtual MDSReadLock is_dds_available(const std::string &name)
Is the DDS response for.
virtual libdap::DMR * get_dmr_object(const std::string &name)
Build a DMR object from the cached Response.
MDSReadLock get_read_lock_helper(const std::string &name, const std::string &suffix, const std::string &object_name)
virtual time_t get_cache_lmt(const std::string &fileName, const std::string &suffix)
Get the last modified time for the cached object file.
virtual MDSReadLock is_dmr_available(const std::string &name)
Is the DMR response for.
virtual MDSReadLock is_das_available(const std::string &name)
Is the DAS response for.
virtual bool is_available_helper(const std::string &realName, const std::string &relativeName, const std::string &fileType, const std::string &suffix)
helper function that checks if last modified time is greater than cached file
virtual bool add_responses(libdap::DDS *dds, const std::string &name)
Add the DAP2 metadata responses using a DDS.
virtual void write_dmrpp_response(const std::string &name, std::ostream &os)
Write the stored DMR++ response to a stream.
Get a new temporary file.
Definition TempFile.h:43
std::string create(const std::string &dir_name="/tmp/hyrax_tmp", const std::string &path_template="opendap")
Create a new temporary file.
Definition TempFile.cc:164
Unlock and close the MDS item when the ReadLock goes out of scope.
Instantiate with a DDS or DMR and use to write the DAS response.
virtual void operator()(std::ostream &os)
Instantiate with a DDS or DMR and use to write the DDS response.
virtual void operator()(std::ostream &os)
Instantiate with a DDS or DMR and use to write the DMR response.
virtual void operator()(std::ostream &os)
Use an object (DDS or DMR) to write data to the MDS.