#include "ApismTcpClient.h" #include #include ApismMessage::ApismMessage() : state(MessageState::INVALID) { } ApismTcpClient::ApismTcpClient(const QString & hostname, const QString & port, ExpectedResponse expectedResponse, QObject *parent) : QObject(parent) , hostname(hostname) , port(port) , responseTimerTimeoutCounter(0) , flag_selfClosed(false) , resendCounter(0) , connectionRefusedCounter(0) , expectedResponse(expectedResponse) , isDebug(false) { this->responseTimeoutTimer = new QTimer(this); this->responseTimeoutTimer->setInterval(10000); this->responseTimeoutTimer->setSingleShot(true); connect(this->responseTimeoutTimer, SIGNAL(timeout()), this, SLOT(onResponseTimeoutTimerTimeout())); this->connectTimeoutTimer = new QTimer(this); this->connectTimeoutTimer->setInterval(10000); this->connectTimeoutTimer->setSingleShot(true); connect(this->connectTimeoutTimer, SIGNAL(timeout()), this, SLOT(onConnectTimeoutTimerTimeout())); socket = new QTcpSocket(this); connect(socket, SIGNAL(connected()), this, SLOT(onSocketConnected())); connect(socket, SIGNAL(disconnected()), this, SLOT(onSocketDisconnected())); connect(socket, SIGNAL(readyRead()), this, SLOT(onSocketReadyRead())); connect(socket, SIGNAL(bytesWritten(qint64)), this, SLOT(onSocketBytesWritten(qint64))); // note: from Qt 5.15 onward a new signal "errorOccurred" will be introduced which could be used whithout this static_cast. // see e.g. https://stackoverflow.com/questions/35655512/compile-error-when-connecting-qtcpsocketerror-using-the-new-qt5-signal-slot connect(socket, static_cast(&QAbstractSocket::error), this, &ApismTcpClient::onSocketErrorOccured); connect(socket, &QTcpSocket::stateChanged, this, &ApismTcpClient::onSocketStateChanged); this->currentMessage = ApismMessage(); } void ApismTcpClient::setResponseTimeout(const quint32 timeout_ms) { this->responseTimeoutTimer->setInterval(timeout_ms); } void ApismTcpClient::setDebug(bool debug) { this->isDebug = debug; } void ApismTcpClient::connectToHost() { this->connectTimeoutTimer->start(); int portNumber = this->port.toInt(); this->socket->connectToHost(QHostAddress(this->hostname), portNumber); } void ApismTcpClient::connectToHost(const QString & hostname, const QString & port) { qCritical() << "ApismTcpClient(" << this->expectedResponse << ")::connectToHost(" << hostname << ", " << port << ")"; this->connectTimeoutTimer->start(); int portNumber = port.toInt(); socket->connectToHost(hostname, portNumber); } void ApismTcpClient::closeConnection() { this->flag_selfClosed = true; socket->close(); } bool ApismTcpClient::isConnected() { bool result = false; QAbstractSocket::SocketState socketState = socket->state(); switch (socketState) { case QAbstractSocket::UnconnectedState: /* FALLTHRU */ case QAbstractSocket::HostLookupState: /* FALLTHRU */ case QAbstractSocket::ConnectingState: result = false; break; case QAbstractSocket::ConnectedState: /* FALLTHRU */ case QAbstractSocket::BoundState: result = true; break; case QAbstractSocket::ClosingState: /* FALLTHRU */ case QAbstractSocket::ListeningState: result = false; break; } return result; } /** * @brief ApismTcpClient::sendData * @param message * * Enqueue message, and try to send it */ void ApismTcpClient::sendData(const QByteArray &message) { if (this->isDebug) { qCritical() << "ApismTcpClient::sendData(" << message << ")"; } this->sendQueue.enqueue(message); this->sendData(); } /** * @brief ApismTcpClient::sendData * * Check connection and try to send message from queue. */ void ApismTcpClient::sendData() { if (this->sendQueue.size() == 0) { if (this->isDebug) { qCritical() << "ApismTcpClient::sendData()" << "no messages in send queue"; } return; } // DEBUG if (this->isDebug) { qCritical() << "ApismTcpClient(" << this->expectedResponse << ")::sendData() sendQueue.size() = " << this->sendQueue.size(); } switch (this->currentMessage.state) { case ApismMessage::MessageState::INVALID: /* FALLTHROUGH */ case ApismMessage::MessageState::NEW: /* FALLTHROUGH */ case ApismMessage::MessageState::RESEND: /* FALLTHROUGH */ case ApismMessage::MessageState::ANSWERED: // allow send message if (this->isConnected()) { this->private_sendData(); } else { this->connectToHost(); } break; case ApismMessage::MessageState::SENT: // wait for answer... if (this->isDebug) { qCritical() << " ... wait for answer"; } break; } } /** * @brief ApismTcpClient::private_sendData * * Precondition: * - queue is not empty, * - socket state is connected * - current message is ANSWERED or INVALID */ void ApismTcpClient::private_sendData() { // take message from queue this->currentMessage.data = this->sendQueue.dequeue(); this->currentMessage.state = ApismMessage::MessageState::SENT; qCritical() << "ApismTcpClient(" << this->expectedResponse << ")::send: " << QString(this->currentMessage.data); socket->write(this->currentMessage.data); socket->flush(); // start timeoutTimer this->responseTimeoutTimer->start(); } void ApismTcpClient::onSocketConnected() { qCritical() << "ApismTcpClient(" << this->expectedResponse << "): Connected!"; this->connectTimeoutTimer->stop(); this->connectionRefusedCounter = 0; switch (this->currentMessage.state) { case ApismMessage::MessageState::INVALID: /* FALLTHROUGH */ case ApismMessage::MessageState::NEW: /* FALLTHROUGH */ case ApismMessage::MessageState::RESEND: /* FALLTHROUGH */ case ApismMessage::MessageState::ANSWERED: // allow send next message if (this->sendQueue.size() > 0) { this->private_sendData(); } break; case ApismMessage::MessageState::SENT: // wait for answer... break; } } void ApismTcpClient::onSocketDisconnected() { qCritical() << "ApismTcpClient(" << this->expectedResponse << "): Disconnected!"; if (!this->flag_selfClosed) { qCritical() << " -> SocketErrorString: " << socket->errorString(); } this->flag_selfClosed = false; if ( (socket->error() == QAbstractSocket::SocketError::RemoteHostClosedError) && (this->responseTimeoutTimer->isActive()) ) { this->responseTimeoutTimer->stop(); qCritical() << " -> still waiting for response "; switch (this->expectedResponse) { case ApismTcpClient::ExpectedResponse::STATE: // try resend: this->currentMessage.state = ApismMessage::MessageState::RESEND; // enqeue current message for resend: this->sendQueue.prepend(this->currentMessage.data); this->sendData(); break; case ApismTcpClient::ExpectedResponse::JSON: this->currentMessage.state = ApismMessage::MessageState::INVALID; emit this->connectionClosedByRemoteHost(); break; } } } void ApismTcpClient::onSocketBytesWritten(qint64 bytes) { if (this->isDebug) { qCritical() << "ApismTcpClient(" << this->expectedResponse << ")::onSocketBytesWritten() -> " << bytes << " bytes written"; } } void ApismTcpClient::onSocketReadyRead() { QByteArray readData; // stop timeoutTimer this->responseTimeoutTimer->stop(); readData = socket->readAll(); qCritical() << "ApismTcpClient(" << this->expectedResponse << ")::received: " << QString(readData); switch (this->expectedResponse) { case ApismTcpClient::ExpectedResponse::JSON: this->private_handleJSONResponse(readData); break; case ApismTcpClient::ExpectedResponse::STATE: this->private_handleStateResponse(readData); break; } if (this->sendQueue.size() > 0) { QTimer::singleShot(1000, this, [this]() { this->sendData(); } ); } else { this->flag_selfClosed = true; this->socket->close(); } } /****************************************************************************** * response handler: */ void ApismTcpClient::private_handleJSONResponse(QByteArray & responseMessage) { emit this->receivedData(responseMessage); // allow send next message: this->currentMessage.state = ApismMessage::MessageState::ANSWERED; this->resendCounter = 0; } /* possible answers: * "RECORD SAVED" --> everything is ok * "RECORD WRITE ABORTED" --> initiate a (delayed) resend * */ void ApismTcpClient::private_handleStateResponse(QByteArray & responseMessage) { QString responseMessageString = QString(responseMessage); if (responseMessageString.contains("ABORTED")) { // Try to resend later: this->currentMessage.state = ApismMessage::MessageState::RESEND; // enqeue current message for resend: this->sendQueue.prepend(this->currentMessage.data); } else if (responseMessageString.contains("RECORD SAVED")) { // allow send next message: this->currentMessage.state = ApismMessage::MessageState::ANSWERED; this->resendCounter = 0; } } /****************************************************************************** */ void ApismTcpClient::onResponseTimeoutTimerTimeout() { qCritical() << "ApismTcpClient(" << this->expectedResponse << ")::onResponseTimeoutTimerTimeout():"; switch (this->currentMessage.state) { case ApismMessage::MessageState::INVALID: /* FALLTHROUGH */ case ApismMessage::MessageState::NEW: /* FALLTHROUGH */ case ApismMessage::MessageState::ANSWERED: // ignore qCritical() << " -> ignore timeout"; return; break; case ApismMessage::MessageState::RESEND: qCritical() << " -> timeout (RESEND)"; qCritical() << " -> resendCounter = " << this->resendCounter; switch (this->expectedResponse) { case ApismTcpClient::ExpectedResponse::STATE: // try resend: this->currentMessage.state = ApismMessage::MessageState::RESEND; // enqeue current message for resend: this->sendQueue.prepend(this->currentMessage.data); this->sendData(); break; case ApismTcpClient::ExpectedResponse::JSON: this->currentMessage.state = ApismMessage::MessageState::INVALID; emit this->responseTimeout(); break; } break; case ApismMessage::MessageState::SENT: // we still do not have a response: switch (this->expectedResponse) { case ApismTcpClient::ExpectedResponse::STATE: // try resend: this->currentMessage.state = ApismMessage::MessageState::RESEND; // enqeue current message for resend: this->sendQueue.prepend(this->currentMessage.data); this->sendData(); break; case ApismTcpClient::ExpectedResponse::JSON: this->currentMessage.state = ApismMessage::MessageState::INVALID; emit this->responseTimeout(); break; } break; } // count resends this->resendCounter++; } void ApismTcpClient::onConnectTimeoutTimerTimeout() { if (this->sendQueue.size() == 0) { return; } qCritical() << "ApismTcpClient(" << this->expectedResponse << ")::onConnectTimeoutTimerTimeout() -> sendQueue.size() = " << this->sendQueue.size(); emit this->connectTimeout(); } void ApismTcpClient::onSocketStateChanged(QAbstractSocket::SocketState socketState) { QString msg; switch (socketState) { case QAbstractSocket::UnconnectedState: msg = "UnconnectedState"; break; case QAbstractSocket::HostLookupState: msg = "HostLookupState"; break; case QAbstractSocket::ConnectingState: msg = "ConnectingState"; break; case QAbstractSocket::ConnectedState: msg = "ConnectedState"; break; case QAbstractSocket::BoundState: msg = "BoundState"; break; case QAbstractSocket::ClosingState: msg = "ClosingState"; break; case QAbstractSocket::ListeningState: msg = "ListeningState"; break; } if (this->isDebug) { qCritical() << "ApismTcpClient(" << this->expectedResponse << ")::onSocketStateChanged() to: " << msg; } } void ApismTcpClient::onSocketErrorOccured(QAbstractSocket::SocketError socketError) { QString msg; bool flagReconnect = false; switch (socketError) { case QAbstractSocket::ConnectionRefusedError: msg = "ConnectionRefusedError"; flagReconnect = true; this->private_handleConnectionRefusedError(); break; case QAbstractSocket::RemoteHostClosedError: msg = "RemoteHostClosedError"; break; case QAbstractSocket::HostNotFoundError: msg = "HostNotFoundError"; break; case QAbstractSocket::SocketAccessError: msg = "SocketAccessError"; break; case QAbstractSocket::SocketResourceError: msg = "SocketResourceError"; break; case QAbstractSocket::SocketTimeoutError: msg = "SocketTimeoutError"; break; case QAbstractSocket::DatagramTooLargeError: msg = "DatagramTooLargeError"; break; case QAbstractSocket::NetworkError: msg = "NetworkError"; break; case QAbstractSocket::AddressInUseError: msg = "AddressInUseError"; break; case QAbstractSocket::SocketAddressNotAvailableError: msg = "SocketAddressNotAvailableError"; break; case QAbstractSocket::UnsupportedSocketOperationError: msg = "UnsupportedSocketOperationError"; break; case QAbstractSocket::ProxyAuthenticationRequiredError: msg = "ProxyAuthenticationRequiredError"; break; case QAbstractSocket::SslHandshakeFailedError: msg = "SslHandshakeFailedError"; break; case QAbstractSocket::UnfinishedSocketOperationError: msg = "UnfinishedSocketOperationError"; break; case QAbstractSocket::ProxyConnectionRefusedError: msg = "ProxyConnectionRefusedError"; break; case QAbstractSocket::ProxyConnectionClosedError: msg = "ProxyConnectionClosedError"; break; case QAbstractSocket::ProxyConnectionTimeoutError: msg = "ProxyConnectionTimeoutError"; break; case QAbstractSocket::ProxyNotFoundError: msg = "ProxyNotFoundError"; break; case QAbstractSocket::ProxyProtocolError: msg = "ProxyProtocolError"; break; case QAbstractSocket::OperationError: msg = "OperationError"; break; case QAbstractSocket::SslInternalError: msg = "SslInternalError"; break; case QAbstractSocket::SslInvalidUserDataError: msg = "SslInvalidUserDataError"; break; case QAbstractSocket::TemporaryError: msg = "TemporaryError"; break; case QAbstractSocket::UnknownSocketError: msg = "UnknownSocketError"; break; } qCritical() << "ApismTcpClient(" << this->expectedResponse << ")::ApismTcpClient::onSocketErrorOccured() -> " << msg; if (flagReconnect) { // try to reconnect for sending: if (this->sendQueue.size() > 0) { QTimer::singleShot(1000, this, [this]() { this->connectToHost(); } ); } } } void ApismTcpClient::private_handleConnectionRefusedError() { if (this->connectionRefusedCounter > 5) { qCritical() << "ApismTcpClient(" << this->expectedResponse << ")::ApismTcpClient::connectionRefusedError()"; this->connectionRefusedCounter = 0; emit this->connectionRefusedError(); } else { this->connectionRefusedCounter++; } } QDebug operator<<(QDebug debug, ApismTcpClient::ExpectedResponse response) { switch (response) { case ApismTcpClient::ExpectedResponse::JSON: debug << "JSON"; break; case ApismTcpClient::ExpectedResponse::STATE: debug << "STATE"; break; } return debug; }