47 #include "DAPCache3.h" 51 #include "InternalErr.h" 52 #include "DapIndent.h" 56 #include "BESSyntaxUserError.h" 57 #include "BESInternalError.h" 59 #include "TheBESKeys.h" 69 static const unsigned long long BYTES_PER_MEG = 1048576ULL;
73 static const unsigned long long MAX_CACHE_SIZE_IN_MEGABYTES = (1ULL << 44);
91 DAPCache3::DAPCache3(
const string &cache_dir,
const string &prefix,
unsigned long long size) :
92 d_cache_dir(cache_dir), d_prefix(prefix), d_max_cache_size_in_bytes(size)
94 m_initialize_cache_info();
97 void DAPCache3::delete_instance() {
98 DBG(cerr <<
"DAPCache3::delete_instance() - Deleting singleton DAPCache3 instance." << endl);
118 BESCache3::get_instance(BESKeys *keys,
const string &cache_dir_key,
const string &prefix_key,
const string &size_key)
121 d_instance =
new BESCache3(keys, cache_dir_key, prefix_key, size_key);
140 if (d_instance == 0){
141 d_instance =
new DAPCache3(cache_dir, prefix, size);
143 atexit(delete_instance);
156 throw InternalErr(__FILE__, __LINE__,
"Tried to get the DAPCache3 instance, but it hasn't been created yet");
161 static inline string get_errno() {
162 char *s_err = strerror(errno);
166 return "Unknown error.";
170 static inline struct flock *lock(
int type) {
171 static struct flock lock;
173 lock.l_whence = SEEK_SET;
176 lock.l_pid = getpid();
181 inline void DAPCache3::m_record_descriptor(
const string &file,
int fd) {
182 DBG(cerr <<
"DAP Cache: recording descriptor: " << file <<
", " << fd << endl);
183 d_locks.insert(std::pair<string, int>(file, fd));
186 inline int DAPCache3::m_get_descriptor(
const string &file) {
187 FilesAndLockDescriptors::iterator i = d_locks.find(file);
189 DBG(cerr <<
"DAP Cache: getting descriptor: " << file <<
", " << fd << endl);
199 static void unlock(
int fd)
201 if (fcntl(fd, F_SETLK, lock(F_UNLCK)) == -1) {
202 throw InternalErr(__FILE__, __LINE__,
"An error occurred trying to unlock the file" + get_errno());
206 throw InternalErr(__FILE__, __LINE__,
"Could not close the (just) unlocked file.");
221 static bool getSharedLock(
const string &file_name,
int &ref_fd)
223 DBG(cerr <<
"getSharedLock: " << file_name <<endl);
226 if ((fd = open(file_name.c_str(), O_RDONLY)) < 0) {
232 throw InternalErr(__FILE__, __LINE__, get_errno());
236 struct flock *l = lock(F_RDLCK);
237 if (fcntl(fd, F_SETLKW, l) == -1) {
240 oss <<
"cache process: " << l->l_pid <<
" triggered a locking error: " << get_errno();
244 DBG(cerr <<
"getSharedLock exit: " << file_name <<endl);
263 static bool getExclusiveLock(
string file_name,
int &ref_fd)
265 DBG(cerr <<
"getExclusiveLock: " << file_name <<endl);
268 if ((fd = open(file_name.c_str(), O_RDWR)) < 0) {
274 throw InternalErr(__FILE__, __LINE__, get_errno());
278 struct flock *l = lock(F_WRLCK);
279 if (fcntl(fd, F_SETLKW, l) == -1) {
282 oss <<
"cache process: " << l->l_pid <<
" triggered a locking error: " << get_errno();
286 DBG(cerr <<
"getExclusiveLock exit: " << file_name <<endl);
304 static bool getExclusiveLockNB(
string file_name,
int &ref_fd)
306 DBG(cerr <<
"getExclusiveLock_nonblocking: " << file_name <<endl);
309 if ((fd = open(file_name.c_str(), O_RDWR)) < 0) {
315 throw InternalErr(__FILE__, __LINE__, get_errno());
319 struct flock *l = lock(F_WRLCK);
320 if (fcntl(fd, F_SETLK, l) == -1) {
323 DBG(cerr <<
"getExclusiveLock_nonblocking exit (false): " << file_name <<
" by: " << l->l_pid << endl);
330 oss <<
"cache process: " << l->l_pid <<
" triggered a locking error: " << get_errno();
336 DBG(cerr <<
"getExclusiveLock_nonblocking exit (true): " << file_name <<endl);
356 static bool createLockedFile(
string file_name,
int &ref_fd)
358 DBG(cerr <<
"createLockedFile: " << file_name <<endl);
361 if ((fd = open(file_name.c_str(), O_CREAT | O_EXCL | O_RDWR, 0666)) < 0) {
367 throw InternalErr(__FILE__, __LINE__, get_errno());
371 struct flock *l = lock(F_WRLCK);
372 if (fcntl(fd, F_SETLKW, l) == -1) {
375 oss <<
"cache process: " << l->l_pid <<
" triggered a locking error: " << get_errno();
379 DBG(cerr <<
"createLockedFile exit: " << file_name <<endl);
387 void DAPCache3::m_check_ctor_params()
389 if (d_cache_dir.empty()) {
390 string err =
"The cache directory was not specified, must be non-empty";
396 int statret = stat(d_cache_dir.c_str(), &buf);
397 if (statret != 0 || !S_ISDIR(buf.st_mode)) {
399 int status = mkdir(d_cache_dir.c_str(), 0775);
401 string err =
"The cache directory " + d_cache_dir +
" does not exist or could not be created.";
406 if (d_prefix.empty()) {
407 string err =
"The cache file prefix was not specified, must not be empty";
411 if (d_max_cache_size_in_bytes <= 0) {
412 string err =
"The cache size was not specified, must be greater than zero";
420 if (d_max_cache_size_in_bytes > MAX_CACHE_SIZE_IN_MEGABYTES) {
421 std::ostringstream msg;
422 msg <<
"The specified cache size was larger than the max cache size of: " << MAX_CACHE_SIZE_IN_MEGABYTES
423 <<
" (was " << d_max_cache_size_in_bytes <<
").";
427 DBG(cerr <<
"DAP Cache: directory " << d_cache_dir <<
", prefix " << d_prefix
428 <<
", max size " << d_max_cache_size_in_bytes << endl );
432 void DAPCache3::m_initialize_cache_info()
436 d_max_cache_size_in_bytes = min(d_max_cache_size_in_bytes, MAX_CACHE_SIZE_IN_MEGABYTES);
437 d_max_cache_size_in_bytes *= BYTES_PER_MEG;
438 d_target_size = d_max_cache_size_in_bytes * 0.8;
440 m_check_ctor_params();
442 d_cache_info = d_cache_dir +
"/dap.cache.info";
446 if (createLockedFile(d_cache_info, d_cache_info_fd)) {
448 unsigned long long size = 0;
449 if (write(d_cache_info_fd, &size,
sizeof(
unsigned long long)) !=
sizeof(
unsigned long long))
450 throw InternalErr(__FILE__, __LINE__,
"Could not write size info to the cache info file in startup!");
456 if ((d_cache_info_fd = open(d_cache_info.c_str(), O_RDWR)) == -1) {
457 throw InternalErr(__FILE__, __LINE__, get_errno());
461 DBG(cerr <<
"d_cache_info_fd: " << d_cache_info_fd << endl);
479 BESCache3::BESCache3(BESKeys *keys,
const string &cache_dir_key,
const string &prefix_key,
const string &size_key) :
480 d_max_cache_size_in_bytes(0)
483 keys->get_value(cache_dir_key, d_cache_dir, found);
485 throw BESSyntaxUserError(
"The cache directory key " + cache_dir_key +
" was not found in the BES configuration file", __FILE__, __LINE__);
488 keys->get_value(prefix_key, d_prefix, found);
490 throw BESSyntaxUserError(
"The prefix key " + prefix_key +
" was not found in the BES configuration file", __FILE__, __LINE__);
493 string cache_size_str;
494 keys->get_value(size_key, cache_size_str, found);
496 throw BESSyntaxUserError(
"The size key " + size_key +
" was not found in the BES configuration file", __FILE__, __LINE__);
498 std::istringstream is(cache_size_str);
499 is >> d_max_cache_size_in_bytes;
501 m_initialize_cache_info();
526 if (target.at(0) ==
'/') {
527 target = src.substr(1, target.length() - 1);
529 string::size_type slash = 0;
530 while ((slash = target.find(
'/')) != string::npos) {
531 target.replace(slash, 1, 1, DAPCache3::DAP_CACHE_CHAR);
533 string::size_type last_dot = target.rfind(
'.');
534 if (last_dot != string::npos) {
535 target = target.substr(0, last_dot);
538 DBG(cerr <<
" d_cache_dir: '" << d_cache_dir <<
"'" << endl);
539 DBG(cerr <<
" d_prefix: '" << d_prefix <<
"'" << endl);
540 DBG(cerr <<
" target: '" << target <<
"'" << endl);
542 return d_cache_dir +
"/" + d_prefix + DAPCache3::DAP_CACHE_CHAR + target;
566 bool status = getSharedLock(target, fd);
568 DBG(cerr <<
"DAP Cache: read_lock: " << target <<
"(" << status <<
")" << endl);
571 m_record_descriptor(target, fd);
594 bool status = createLockedFile(target, fd);
596 DBG(cerr <<
"DAP Cache: create_and_lock: " << target <<
"(" << status <<
")" << endl);
599 m_record_descriptor(target, fd);
623 lock.l_type = F_RDLCK;
624 lock.l_whence = SEEK_SET;
627 lock.l_pid = getpid();
629 if (fcntl(fd, F_SETLKW, &lock) == -1) {
630 throw InternalErr(__FILE__, __LINE__, get_errno());
644 DBG(cerr <<
"lock_cache - d_cache_info_fd: " << d_cache_info_fd << endl);
646 if (fcntl(d_cache_info_fd, F_SETLKW, lock(F_WRLCK)) == -1) {
647 throw InternalErr(__FILE__, __LINE__,
"An error occurred trying to lock the cache-control file" + get_errno());
656 DBG(cerr <<
"lock_cache - d_cache_info_fd: " << d_cache_info_fd << endl);
658 if (fcntl(d_cache_info_fd, F_SETLKW, lock(F_RDLCK)) == -1) {
659 throw InternalErr(__FILE__, __LINE__,
"An error occurred trying to lock the cache-control file" + get_errno());
670 DBG(cerr <<
"DAP Cache: unlock: cache_info (fd: " << d_cache_info_fd <<
")" << endl);
672 if (fcntl(d_cache_info_fd, F_SETLK, lock(F_UNLCK)) == -1) {
673 throw InternalErr(__FILE__, __LINE__,
"An error occurred trying to unlock the cache-control file" + get_errno());
688 void DAPCache3::unlock_and_close(
const string &file_name)
690 DBG(cerr <<
"DAP Cache: unlock file: " << file_name << endl);
692 unlock(m_get_descriptor(file_name));
700 void DAPCache3::unlock_and_close(
int fd)
702 DBG(cerr <<
"DAP Cache: unlock fd: " << fd << endl);
706 DBG(cerr <<
"DAP Cache: unlock " << fd <<
" Success" << endl);
724 if (lseek(d_cache_info_fd, 0, SEEK_SET) == -1)
725 throw InternalErr(__FILE__, __LINE__,
"Could not rewind to front of cache info file.");
728 unsigned long long current_size;
729 if (read(d_cache_info_fd, ¤t_size,
sizeof(
unsigned long long)) !=
sizeof(
unsigned long long))
730 throw InternalErr(__FILE__, __LINE__,
"Could not get read size info from the cache info file!");
733 int statret = stat(target.c_str(), &buf);
735 current_size += buf.st_size;
737 throw InternalErr(__FILE__, __LINE__,
"Could not read the size of the new file: " + target +
" : " + get_errno());
739 DBG(cerr <<
"DAP Cache: cache size updated to: " << current_size << endl);
741 if (lseek(d_cache_info_fd, 0, SEEK_SET) == -1)
742 throw InternalErr(__FILE__, __LINE__,
"Could not rewind to front of cache info file.");
744 if(write(d_cache_info_fd, ¤t_size,
sizeof(
unsigned long long)) !=
sizeof(
unsigned long long))
745 throw InternalErr(__FILE__, __LINE__,
"Could not write size info from the cache info file!");
762 return current_size > d_max_cache_size_in_bytes;
777 if (lseek(d_cache_info_fd, 0, SEEK_SET) == -1)
778 throw InternalErr(__FILE__, __LINE__,
"Could not rewind to front of cache info file.");
780 unsigned long long current_size;
781 if(read(d_cache_info_fd, ¤t_size,
sizeof(
unsigned long long)) !=
sizeof(
unsigned long long))
782 throw InternalErr(__FILE__, __LINE__,
"Could not get read size info from the cache info file!");
796 return e1.time < e2.time;
800 unsigned long long DAPCache3::m_collect_cache_dir_info(CacheFiles &contents)
802 DIR *dip = opendir(d_cache_dir.c_str());
804 throw InternalErr(__FILE__, __LINE__,
"Unable to open cache directory " + d_cache_dir);
807 vector<string> files;
810 while ((dit = readdir(dip)) != NULL) {
811 string dirEntry = dit->d_name;
812 if (dirEntry.compare(0, d_prefix.length(), d_prefix) == 0) {
813 files.push_back(d_cache_dir +
"/" + dirEntry);
819 unsigned long long current_size = 0;
821 for (vector<string>::iterator file = files.begin(); file != files.end(); ++file) {
822 if (stat(file->c_str(), &buf) == 0) {
823 current_size += buf.st_size;
826 entry.size = buf.st_size;
827 entry.time = buf.st_atime;
831 throw InternalErr(__FILE__, __LINE__,
"Zero-byte file found in cache. " + *file);
833 contents.push_back(entry);
838 contents.sort(entry_op);
856 DBG(cerr <<
"purge - starting the purge" << endl);
862 unsigned long long computed_size = m_collect_cache_dir_info(contents);
864 if (BESISDEBUG(
"cache_contents" )) {
865 DBG(endl <<
"BEFORE Purge " << computed_size/BYTES_PER_MEG << endl );
866 CacheFiles::iterator ti = contents.begin();
867 CacheFiles::iterator te = contents.end();
868 for (; ti != te; ti++) {
869 DBG((*ti).time <<
": " << (*ti).name <<
": size " << (*ti).size/BYTES_PER_MEG << endl );
873 DBG(cerr <<
"purge - current and target size (in MB) " << computed_size/BYTES_PER_MEG <<
", " << d_target_size/BYTES_PER_MEG << endl );
876 if (cache_too_big(computed_size)) {
880 CacheFiles::iterator i = contents.begin();
881 while (i != contents.end() && computed_size > d_target_size) {
886 if (i->name != new_file && getExclusiveLockNB(i->name, cfile_fd)) {
887 DBG(cerr <<
"purge: " << i->name <<
" removed." << endl );
889 if (unlink(i->name.c_str()) != 0)
890 throw InternalErr(__FILE__, __LINE__,
"Unable to purge the file " + i->name +
" from the cache: " + get_errno());
893 computed_size -= i->size;
898 DBG(cerr <<
"purge: " << i->name <<
" is in use." << endl );
903 DBG(cerr <<
"purge - current and target size (in MB) " << computed_size/BYTES_PER_MEG <<
", " << d_target_size/BYTES_PER_MEG << endl );
908 if (lseek(d_cache_info_fd, 0, SEEK_SET) == -1)
909 throw InternalErr(__FILE__, __LINE__,
"Could not rewind to front of cache info file.");
911 if(write(d_cache_info_fd, &computed_size,
sizeof(
unsigned long long)) !=
sizeof(
unsigned long long))
912 throw InternalErr(__FILE__, __LINE__,
"Could not write size info to the cache info file!");
914 if (BESISDEBUG(
"cache_contents" )) {
916 computed_size = m_collect_cache_dir_info(contents);
917 DBG(endl <<
"AFTER Purge " << computed_size/BYTES_PER_MEG << endl );
918 CacheFiles::iterator ti = contents.begin();
919 CacheFiles::iterator te = contents.end();
920 for (; ti != te; ti++) {
921 DBG((*ti).time <<
": " << (*ti).name <<
": size " << (*ti).size/BYTES_PER_MEG << endl );
946 DBG(cerr <<
"purge_file - starting the purge" << endl);
953 if (getExclusiveLock(file, cfile_fd)) {
955 unsigned long long size = 0;
957 if (stat(file.c_str(), &buf) == 0) {
961 DBG(cerr <<
"purge_file: " << file <<
" removed." << endl );
963 if (unlink(file.c_str()) != 0)
965 "Unable to purge the file " + file +
" from the cache: " + get_errno());
969 unsigned long long cache_size = get_cache_size() - size;
971 if (lseek(d_cache_info_fd, 0, SEEK_SET) == -1)
972 throw InternalErr(__FILE__, __LINE__,
"Could not rewind to front of cache info file.");
974 if (write(d_cache_info_fd, &cache_size,
sizeof(
unsigned long long)) !=
sizeof(
unsigned long long))
975 throw InternalErr(__FILE__, __LINE__,
"Could not write size info to the cache info file!");
995 strm << DapIndent::LMarg <<
"DAPCache3::dump - (" << (
void *)
this <<
")" << endl;
997 strm << DapIndent::LMarg <<
"cache dir: " << d_cache_dir << endl;
998 strm << DapIndent::LMarg <<
"prefix: " << d_prefix << endl;
999 strm << DapIndent::LMarg <<
"size (bytes): " << d_max_cache_size_in_bytes << endl;
1000 DapIndent::UnIndent();
virtual unsigned long long update_cache_info(const string &target)
Update the cache info file to include 'target'.
string get_cache_file_name(const string &src, bool mangle=true)
virtual void dump(ostream &strm) const
dumps information about this object
virtual unsigned long long get_cache_size()
Get the cache size. Read the size information from the cache info file and return it...
top level DAP object to house generic methods
A class for software fault reporting.
virtual bool create_and_lock(const string &target, int &fd)
Create a file in the cache and lock it for write access. If the file does not exist, make it, open it for read-write access and get an exclusive lock on it. The locking operation blocks, although that should never happen.
virtual void lock_cache_write()
virtual bool cache_too_big(unsigned long long current_size) const
look at the cache size; is it too large? Look at the cache size and see if it is too big...
virtual void exclusive_to_shared_lock(int fd)
Transfer from an exclusive lock to a shared lock. If the file has an exclusive write lock on it...
virtual bool get_read_lock(const string &target, int &fd)
Get a read-only lock on the file if it exists.
virtual void lock_cache_read()
virtual void update_and_purge(const string &new_file)
Purge files from the cache.
static DAPCache3 * get_instance()
Implementation of a caching mechanism for compressed data. This cache uses simple advisory locking fo...
virtual void purge_file(const string &file)
Purge a single file from the cache.
virtual void unlock_cache()