44#include "PPTConnection.h"
45#include "PPTProtocolNames.h"
49#include "BESInternalError.h"
55using std::ostringstream;
56using std::istringstream;
66#define prolog string("PPTConnection::").append(__func__).append("() - ")
68PPTConnection::~PPTConnection()
111 if (!buffer.empty()) {
112 sendChunk(buffer, extensions);
115 map<string, string> no_extensions;
116 sendChunk(
"", no_extensions);
119 sendChunk(
"", extensions);
127 map<string, string> extensions;
128 extensions[
"status"] = PPT_EXIT_NOW;
129 send(
"", extensions);
133 sendChunk(
"", extensions);
144void PPTConnection::sendChunk(
const string &buffer, map<string, string> &extensions)
147 if (extensions.size()) {
150 strm << hex << setw(7) << setfill(
'0') << buffer.size() <<
"d";
151 if (!buffer.empty()) {
154 string toSend = strm.str();
165 if (extensions.size()) {
169 for (; i != ie; i++) {
171 string value = (*i).second;
172 if (!value.empty()) {
173 estrm <<
"=" << value;
177 string xstr = estrm.str();
178 strm << hex << setw(7) << setfill(
'0') << xstr.size() <<
"x" << xstr;
179 string toSend = strm.str();
192 BESDEBUG(MODULE, prolog <<
"Sending " << buffer << endl);
193 _mySock->send(buffer, 0, buffer.size());
204 return _mySock->receive(buffer, buffer_size);
207int PPTConnection::readChunkHeader(
char *buffer,
int buffer_size)
209 char *temp_buffer = buffer;
210 int totalBytesRead = 0;
213 int bytesRead =
readBuffer(temp_buffer, buffer_size);
214 BESDEBUG( MODULE, prolog <<
"Read " << bytesRead <<
" bytes" << endl );
219 if (bytesRead == 0) {
222 if (bytesRead < buffer_size) {
223 buffer_size = buffer_size - bytesRead;
224 temp_buffer = temp_buffer + bytesRead;
225 totalBytesRead += bytesRead;
228 totalBytesRead += bytesRead;
232 buffer[totalBytesRead] =
'\0';
233 return totalBytesRead;
251bool PPTConnection::receive(map<string, string> &extensions, ostream *strm)
253 ostream *use_strm = _out;
254 if (strm) use_strm = strm;
258 BESDEBUG( MODULE, prolog <<
"buffer size = " << _inBuff_len << endl );
260 _inBuff_len = _mySock->getRecvBufferSize() + 1;
261 _inBuff =
new char[_inBuff_len + 1];
267 int bytesRead = readChunkHeader(_inBuff, 8);
268 BESDEBUG( MODULE, prolog <<
"Reading header, read " << bytesRead <<
" bytes" << endl );
269 if (bytesRead == 0) {
270 INFO_LOG(
"PPTConnection::receive: read EOF from the OLFS, beslistener exiting.\n");
277 extensions[
"status"] = PPT_EXIT_NOW;
280 else if (bytesRead != 8) {
281 throw BESInternalError(prolog +
"Failed to read chunk header", __FILE__, __LINE__);
285 lenbuffer[0] = _inBuff[0];
286 lenbuffer[1] = _inBuff[1];
287 lenbuffer[2] = _inBuff[2];
288 lenbuffer[3] = _inBuff[3];
289 lenbuffer[4] = _inBuff[4];
290 lenbuffer[5] = _inBuff[5];
291 lenbuffer[6] = _inBuff[6];
293 istringstream lenstrm(lenbuffer);
294 unsigned long inlen = 0;
295 lenstrm >> hex >> setw(7) >> inlen;
296 BESDEBUG( MODULE, prolog <<
"Reading header, chunk length = " << inlen << endl );
297 BESDEBUG( MODULE, prolog <<
"Reading header, chunk type = " << _inBuff[7] << endl );
299 if (_inBuff[7] ==
'x') {
301 receive(xstrm, inlen);
304 else if (_inBuff[7] ==
'd') {
310 receive(*use_strm, inlen);
313 string err = (string)
"type of data is " + _inBuff[7] +
", should be x for extensions or d for data";
329void PPTConnection::receive(ostream &strm,
const int len)
331 BESDEBUG( MODULE, prolog <<
"len = " << len << endl );
334 string err =
"buffer has not been initialized";
339 if( len > _inBuff_len )
341 to_read = _inBuff_len;
343 BESDEBUG( MODULE, prolog <<
"to_read = " << to_read << endl );
346 int bytesRead =
readBuffer( _inBuff, to_read );
349 string err =
"Failed to read data from socket";
350 throw BESInternalError( err, __FILE__, __LINE__ );
352 BESDEBUG( MODULE, prolog <<
"bytesRead = " << bytesRead << endl );
355 _inBuff[bytesRead] =
'\0';
356 strm.write( _inBuff, bytesRead );
361 if( bytesRead < len )
363 BESDEBUG( MODULE, prolog <<
"remaining = " << (len - bytesRead) << endl );
364 receive( strm, len - bytesRead );
385 unsigned int index = 0;
388 string::size_type semi = xstr.find(
';', index);
389 if (semi == string::npos) {
390 string err =
"malformed extensions " + xstr.substr(index, xstr.size() - index) +
", missing semicolon";
393 string::size_type eq = xstr.find(
'=', index);
394 if (eq == string::npos || eq > semi) {
396 var = xstr.substr(index, semi - index);
397 extensions[var] =
"";
399 else if (eq == semi - 1) {
400 string err =
"malformed extensions " + xstr.substr(index, xstr.size() - index)
401 +
", missing value after =";
405 var = xstr.substr(index, eq - index);
406 val = xstr.substr(eq + 1, semi - eq - 1);
407 extensions[var] = val;
410 if (index >= xstr.size()) {
428 struct pollfd arr[1];
429 arr[0].fd = getSocket()->getSocketDescriptor();
430 arr[0].events = POLLIN;
435 for (
int j = 0; j < _timeout; j++) {
436 if (poll(arr, 1, 1000) < 0) {
438 if (errno == EINTR || errno == EAGAIN)
continue;
440 throw BESInternalError(
string(
"poll error") +
" " + strerror(errno), __FILE__, __LINE__);
443 if (arr[0].revents == POLLIN) {
447 cout <<
" " << j << flush;
455unsigned int PPTConnection::getRecvChunkSize()
457 return _mySock->getRecvBufferSize() - PPT_CHUNK_HEADER_SPACE;
460unsigned int PPTConnection::getSendChunkSize()
462 return _mySock->getSendBufferSize() - PPT_CHUNK_HEADER_SPACE;
473 strm << BESIndent::LMarg <<
"PPTConnection::dump - (" << (
void *)
this <<
")" << endl;
476 BESIndent::UnIndent();
exception thrown if internal error encountered
void dump(std::ostream &strm) const override
dumps information about this object
void send(const std::string &buffer) override
sends the buffer to the socket
virtual int readBuffer(char *inBuff, const unsigned int buff_size)
read a buffer of data from the socket
void sendExtensions(std::map< std::string, std::string > &extensions) override
send the specified extensions
void sendExit() override
Send the exit token as an extension.
virtual void read_extensions(std::map< std::string, std::string > &extensions, const std::string &xstr)
the string passed are extensions, read them and store the name/value pairs into the passed map
virtual int readBufferNonBlocking(char *inBuff, const int buff_size)
read a buffer of data from the socket without blocking
void dump(std::ostream &strm) const override
dumps information about this object