565 lines
17 KiB
C++
565 lines
17 KiB
C++
|
#include "ApismTcpClient.h"
|
||
|
|
||
|
#include <QHostAddress>
|
||
|
#include <QTimer>
|
||
|
|
||
|
ApismMessage::ApismMessage() : state(MessageState::INVALID) { }
|
||
|
|
||
|
|
||
|
ApismTcpClient::ApismTcpClient(const QString & hostname,
|
||
|
const QString & port,
|
||
|
ExpectedResponse expectedResponse,
|
||
|
QObject *parent)
|
||
|
: QObject(parent)
|
||
|
, hostname(hostname)
|
||
|
, port(port)
|
||
|
, responseTimerTimeoutCounter(0)
|
||
|
, flag_selfClosed(false)
|
||
|
, resendCounter(0)
|
||
|
, connectionRefusedCounter(0)
|
||
|
, expectedResponse(expectedResponse)
|
||
|
, isDebug(false)
|
||
|
{
|
||
|
this->responseTimeoutTimer = new QTimer(this);
|
||
|
this->responseTimeoutTimer->setInterval(10000);
|
||
|
this->responseTimeoutTimer->setSingleShot(true);
|
||
|
|
||
|
connect(this->responseTimeoutTimer, SIGNAL(timeout()), this, SLOT(onResponseTimeoutTimerTimeout()));
|
||
|
|
||
|
this->connectTimeoutTimer = new QTimer(this);
|
||
|
this->connectTimeoutTimer->setInterval(10000);
|
||
|
this->connectTimeoutTimer->setSingleShot(true);
|
||
|
|
||
|
connect(this->connectTimeoutTimer, SIGNAL(timeout()), this, SLOT(onConnectTimeoutTimerTimeout()));
|
||
|
|
||
|
socket = new QTcpSocket(this);
|
||
|
connect(socket, SIGNAL(connected()), this, SLOT(onSocketConnected()));
|
||
|
connect(socket, SIGNAL(disconnected()), this, SLOT(onSocketDisconnected()));
|
||
|
connect(socket, SIGNAL(readyRead()), this, SLOT(onSocketReadyRead()));
|
||
|
connect(socket, SIGNAL(bytesWritten(qint64)), this, SLOT(onSocketBytesWritten(qint64)));
|
||
|
// note: from Qt 5.15 onward a new signal "errorOccurred" will be introduced which could be used whithout this static_cast.
|
||
|
// see e.g. https://stackoverflow.com/questions/35655512/compile-error-when-connecting-qtcpsocketerror-using-the-new-qt5-signal-slot
|
||
|
connect(socket, static_cast<void (QTcpSocket::*)(QAbstractSocket::SocketError)>(&QAbstractSocket::error), this, &ApismTcpClient::onSocketErrorOccured);
|
||
|
connect(socket, &QTcpSocket::stateChanged, this, &ApismTcpClient::onSocketStateChanged);
|
||
|
|
||
|
this->currentMessage = ApismMessage();
|
||
|
}
|
||
|
|
||
|
|
||
|
void ApismTcpClient::setResponseTimeout(const quint32 timeout_ms)
|
||
|
{
|
||
|
this->responseTimeoutTimer->setInterval(timeout_ms);
|
||
|
}
|
||
|
|
||
|
void ApismTcpClient::setDebug(bool debug)
|
||
|
{
|
||
|
this->isDebug = debug;
|
||
|
}
|
||
|
|
||
|
void ApismTcpClient::connectToHost()
|
||
|
{
|
||
|
this->connectTimeoutTimer->start();
|
||
|
int portNumber = this->port.toInt();
|
||
|
this->socket->connectToHost(QHostAddress(this->hostname), portNumber);
|
||
|
}
|
||
|
|
||
|
void ApismTcpClient::connectToHost(const QString & hostname, const QString & port)
|
||
|
{
|
||
|
qCritical() << "ApismTcpClient(" << this->expectedResponse << ")::connectToHost(" << hostname << ", " << port << ")";
|
||
|
|
||
|
this->connectTimeoutTimer->start();
|
||
|
int portNumber = port.toInt();
|
||
|
socket->connectToHost(hostname, portNumber);
|
||
|
}
|
||
|
|
||
|
void ApismTcpClient::closeConnection()
|
||
|
{
|
||
|
this->flag_selfClosed = true;
|
||
|
socket->close();
|
||
|
}
|
||
|
|
||
|
bool ApismTcpClient::isConnected()
|
||
|
{
|
||
|
bool result = false;
|
||
|
QAbstractSocket::SocketState socketState = socket->state();
|
||
|
|
||
|
switch (socketState) {
|
||
|
case QAbstractSocket::UnconnectedState:
|
||
|
/* FALLTHRU */
|
||
|
case QAbstractSocket::HostLookupState:
|
||
|
/* FALLTHRU */
|
||
|
case QAbstractSocket::ConnectingState:
|
||
|
result = false;
|
||
|
break;
|
||
|
case QAbstractSocket::ConnectedState:
|
||
|
/* FALLTHRU */
|
||
|
case QAbstractSocket::BoundState:
|
||
|
result = true;
|
||
|
break;
|
||
|
case QAbstractSocket::ClosingState:
|
||
|
/* FALLTHRU */
|
||
|
case QAbstractSocket::ListeningState:
|
||
|
result = false;
|
||
|
break;
|
||
|
}
|
||
|
|
||
|
return result;
|
||
|
}
|
||
|
|
||
|
|
||
|
|
||
|
/**
|
||
|
* @brief ApismTcpClient::sendData
|
||
|
* @param message
|
||
|
*
|
||
|
* Enqueue message, and try to send it
|
||
|
*/
|
||
|
void ApismTcpClient::sendData(const QByteArray &message)
|
||
|
{
|
||
|
if (this->isDebug) {
|
||
|
qCritical() << "ApismTcpClient::sendData(" << message << ")";
|
||
|
}
|
||
|
|
||
|
this->sendQueue.enqueue(message);
|
||
|
|
||
|
this->sendData();
|
||
|
}
|
||
|
|
||
|
/**
|
||
|
* @brief ApismTcpClient::sendData
|
||
|
*
|
||
|
* Check connection and try to send message from queue.
|
||
|
*/
|
||
|
void ApismTcpClient::sendData()
|
||
|
{
|
||
|
if (this->sendQueue.size() == 0) {
|
||
|
if (this->isDebug) {
|
||
|
qCritical() << "ApismTcpClient::sendData()" << "no messages in send queue";
|
||
|
}
|
||
|
return;
|
||
|
}
|
||
|
|
||
|
// DEBUG
|
||
|
if (this->isDebug) {
|
||
|
qCritical() << "ApismTcpClient(" << this->expectedResponse << ")::sendData() sendQueue.size() = " << this->sendQueue.size();
|
||
|
}
|
||
|
|
||
|
switch (this->currentMessage.state) {
|
||
|
case ApismMessage::MessageState::INVALID:
|
||
|
/* FALLTHROUGH */
|
||
|
case ApismMessage::MessageState::NEW:
|
||
|
/* FALLTHROUGH */
|
||
|
case ApismMessage::MessageState::RESEND:
|
||
|
/* FALLTHROUGH */
|
||
|
case ApismMessage::MessageState::ANSWERED:
|
||
|
// allow send message
|
||
|
if (this->isConnected()) {
|
||
|
this->private_sendData();
|
||
|
}
|
||
|
else {
|
||
|
this->connectToHost();
|
||
|
}
|
||
|
break;
|
||
|
case ApismMessage::MessageState::SENT:
|
||
|
// wait for answer...
|
||
|
if (this->isDebug) {
|
||
|
qCritical() << " ... wait for answer";
|
||
|
}
|
||
|
break;
|
||
|
}
|
||
|
}
|
||
|
|
||
|
/**
|
||
|
* @brief ApismTcpClient::private_sendData
|
||
|
*
|
||
|
* Precondition:
|
||
|
* - queue is not empty,
|
||
|
* - socket state is connected
|
||
|
* - current message is ANSWERED or INVALID
|
||
|
*/
|
||
|
void ApismTcpClient::private_sendData()
|
||
|
{
|
||
|
// take message from queue
|
||
|
this->currentMessage.data = this->sendQueue.dequeue();
|
||
|
this->currentMessage.state = ApismMessage::MessageState::SENT;
|
||
|
|
||
|
qCritical() << "ApismTcpClient(" << this->expectedResponse << ")::send: " << QString(this->currentMessage.data);
|
||
|
|
||
|
socket->write(this->currentMessage.data);
|
||
|
socket->flush();
|
||
|
|
||
|
// start timeoutTimer
|
||
|
this->responseTimeoutTimer->start();
|
||
|
}
|
||
|
|
||
|
|
||
|
void ApismTcpClient::onSocketConnected()
|
||
|
{
|
||
|
qCritical() << "ApismTcpClient(" << this->expectedResponse << "): Connected!";
|
||
|
|
||
|
this->connectTimeoutTimer->stop();
|
||
|
this->connectionRefusedCounter = 0;
|
||
|
|
||
|
switch (this->currentMessage.state) {
|
||
|
case ApismMessage::MessageState::INVALID:
|
||
|
/* FALLTHROUGH */
|
||
|
case ApismMessage::MessageState::NEW:
|
||
|
/* FALLTHROUGH */
|
||
|
case ApismMessage::MessageState::RESEND:
|
||
|
/* FALLTHROUGH */
|
||
|
case ApismMessage::MessageState::ANSWERED:
|
||
|
// allow send next message
|
||
|
if (this->sendQueue.size() > 0) {
|
||
|
this->private_sendData();
|
||
|
}
|
||
|
break;
|
||
|
case ApismMessage::MessageState::SENT:
|
||
|
// wait for answer...
|
||
|
break;
|
||
|
}
|
||
|
}
|
||
|
|
||
|
void ApismTcpClient::onSocketDisconnected()
|
||
|
{
|
||
|
qCritical() << "ApismTcpClient(" << this->expectedResponse << "): Disconnected!";
|
||
|
|
||
|
if (!this->flag_selfClosed) {
|
||
|
qCritical() << " -> SocketErrorString: " << socket->errorString();
|
||
|
}
|
||
|
this->flag_selfClosed = false;
|
||
|
|
||
|
if ( (socket->error() == QAbstractSocket::SocketError::RemoteHostClosedError) &&
|
||
|
(this->responseTimeoutTimer->isActive()) )
|
||
|
{
|
||
|
this->responseTimeoutTimer->stop();
|
||
|
qCritical() << " -> still waiting for response ";
|
||
|
|
||
|
switch (this->expectedResponse) {
|
||
|
case ApismTcpClient::ExpectedResponse::STATE:
|
||
|
// try resend:
|
||
|
this->currentMessage.state = ApismMessage::MessageState::RESEND;
|
||
|
// enqeue current message for resend:
|
||
|
this->sendQueue.prepend(this->currentMessage.data);
|
||
|
|
||
|
this->sendData();
|
||
|
break;
|
||
|
case ApismTcpClient::ExpectedResponse::JSON:
|
||
|
this->currentMessage.state = ApismMessage::MessageState::INVALID;
|
||
|
emit this->connectionClosedByRemoteHost();
|
||
|
break;
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
void ApismTcpClient::onSocketBytesWritten(qint64 bytes)
|
||
|
{
|
||
|
if (this->isDebug) {
|
||
|
qCritical() << "ApismTcpClient(" << this->expectedResponse << ")::onSocketBytesWritten() -> " << bytes << " bytes written";
|
||
|
}
|
||
|
}
|
||
|
|
||
|
void ApismTcpClient::onSocketReadyRead()
|
||
|
{
|
||
|
QByteArray readData;
|
||
|
|
||
|
// stop timeoutTimer
|
||
|
this->responseTimeoutTimer->stop();
|
||
|
|
||
|
readData = socket->readAll();
|
||
|
|
||
|
qCritical() << "ApismTcpClient(" << this->expectedResponse << ")::received: " << QString(readData);
|
||
|
|
||
|
switch (this->expectedResponse) {
|
||
|
case ApismTcpClient::ExpectedResponse::JSON:
|
||
|
this->private_handleJSONResponse(readData);
|
||
|
break;
|
||
|
case ApismTcpClient::ExpectedResponse::STATE:
|
||
|
this->private_handleStateResponse(readData);
|
||
|
break;
|
||
|
}
|
||
|
|
||
|
|
||
|
if (this->sendQueue.size() > 0) {
|
||
|
QTimer::singleShot(1000,
|
||
|
this,
|
||
|
[this]() { this->sendData(); }
|
||
|
);
|
||
|
}
|
||
|
else {
|
||
|
this->flag_selfClosed = true;
|
||
|
this->socket->close();
|
||
|
}
|
||
|
}
|
||
|
|
||
|
/******************************************************************************
|
||
|
* response handler:
|
||
|
*/
|
||
|
void ApismTcpClient::private_handleJSONResponse(QByteArray & responseMessage)
|
||
|
{
|
||
|
emit this->receivedData(responseMessage);
|
||
|
|
||
|
// allow send next message:
|
||
|
this->currentMessage.state = ApismMessage::MessageState::ANSWERED;
|
||
|
this->resendCounter = 0;
|
||
|
}
|
||
|
|
||
|
/* possible answers:
|
||
|
* "RECORD SAVED" --> everything is ok
|
||
|
* "RECORD WRITE ABORTED" --> initiate a (delayed) resend
|
||
|
*
|
||
|
*/
|
||
|
void ApismTcpClient::private_handleStateResponse(QByteArray & responseMessage)
|
||
|
{
|
||
|
QString responseMessageString = QString(responseMessage);
|
||
|
|
||
|
if (responseMessageString.contains("ABORTED")) {
|
||
|
// Try to resend later:
|
||
|
this->currentMessage.state = ApismMessage::MessageState::RESEND;
|
||
|
// enqeue current message for resend:
|
||
|
this->sendQueue.prepend(this->currentMessage.data);
|
||
|
}
|
||
|
else
|
||
|
if (responseMessageString.contains("RECORD SAVED")) {
|
||
|
// allow send next message:
|
||
|
this->currentMessage.state = ApismMessage::MessageState::ANSWERED;
|
||
|
this->resendCounter = 0;
|
||
|
}
|
||
|
}
|
||
|
|
||
|
/******************************************************************************
|
||
|
*/
|
||
|
|
||
|
|
||
|
|
||
|
|
||
|
|
||
|
void ApismTcpClient::onResponseTimeoutTimerTimeout()
|
||
|
{
|
||
|
qCritical() << "ApismTcpClient(" << this->expectedResponse << ")::onResponseTimeoutTimerTimeout():";
|
||
|
|
||
|
|
||
|
switch (this->currentMessage.state) {
|
||
|
case ApismMessage::MessageState::INVALID:
|
||
|
/* FALLTHROUGH */
|
||
|
case ApismMessage::MessageState::NEW:
|
||
|
/* FALLTHROUGH */
|
||
|
case ApismMessage::MessageState::ANSWERED:
|
||
|
// ignore
|
||
|
qCritical() << " -> ignore timeout";
|
||
|
return;
|
||
|
break;
|
||
|
case ApismMessage::MessageState::RESEND:
|
||
|
qCritical() << " -> timeout (RESEND)";
|
||
|
qCritical() << " -> resendCounter = " << this->resendCounter;
|
||
|
switch (this->expectedResponse) {
|
||
|
case ApismTcpClient::ExpectedResponse::STATE:
|
||
|
// try resend:
|
||
|
this->currentMessage.state = ApismMessage::MessageState::RESEND;
|
||
|
// enqeue current message for resend:
|
||
|
this->sendQueue.prepend(this->currentMessage.data);
|
||
|
|
||
|
this->sendData();
|
||
|
break;
|
||
|
case ApismTcpClient::ExpectedResponse::JSON:
|
||
|
this->currentMessage.state = ApismMessage::MessageState::INVALID;
|
||
|
emit this->responseTimeout();
|
||
|
break;
|
||
|
}
|
||
|
break;
|
||
|
case ApismMessage::MessageState::SENT:
|
||
|
// we still do not have a response:
|
||
|
|
||
|
switch (this->expectedResponse) {
|
||
|
case ApismTcpClient::ExpectedResponse::STATE:
|
||
|
// try resend:
|
||
|
this->currentMessage.state = ApismMessage::MessageState::RESEND;
|
||
|
// enqeue current message for resend:
|
||
|
this->sendQueue.prepend(this->currentMessage.data);
|
||
|
|
||
|
this->sendData();
|
||
|
break;
|
||
|
case ApismTcpClient::ExpectedResponse::JSON:
|
||
|
this->currentMessage.state = ApismMessage::MessageState::INVALID;
|
||
|
emit this->responseTimeout();
|
||
|
break;
|
||
|
}
|
||
|
|
||
|
|
||
|
break;
|
||
|
}
|
||
|
|
||
|
// count resends
|
||
|
this->resendCounter++;
|
||
|
}
|
||
|
|
||
|
|
||
|
|
||
|
void ApismTcpClient::onConnectTimeoutTimerTimeout()
|
||
|
{
|
||
|
if (this->sendQueue.size() == 0) {
|
||
|
return;
|
||
|
}
|
||
|
|
||
|
qCritical() << "ApismTcpClient(" << this->expectedResponse << ")::onConnectTimeoutTimerTimeout() -> sendQueue.size() = " << this->sendQueue.size();
|
||
|
|
||
|
emit this->connectTimeout();
|
||
|
}
|
||
|
|
||
|
|
||
|
void ApismTcpClient::onSocketStateChanged(QAbstractSocket::SocketState socketState)
|
||
|
{
|
||
|
QString msg;
|
||
|
|
||
|
switch (socketState) {
|
||
|
case QAbstractSocket::UnconnectedState:
|
||
|
msg = "UnconnectedState";
|
||
|
break;
|
||
|
case QAbstractSocket::HostLookupState:
|
||
|
msg = "HostLookupState";
|
||
|
break;
|
||
|
case QAbstractSocket::ConnectingState:
|
||
|
msg = "ConnectingState";
|
||
|
break;
|
||
|
case QAbstractSocket::ConnectedState:
|
||
|
msg = "ConnectedState";
|
||
|
break;
|
||
|
case QAbstractSocket::BoundState:
|
||
|
msg = "BoundState";
|
||
|
break;
|
||
|
case QAbstractSocket::ClosingState:
|
||
|
msg = "ClosingState";
|
||
|
break;
|
||
|
case QAbstractSocket::ListeningState:
|
||
|
msg = "ListeningState";
|
||
|
break;
|
||
|
}
|
||
|
|
||
|
if (this->isDebug) {
|
||
|
qCritical() << "ApismTcpClient(" << this->expectedResponse << ")::onSocketStateChanged() to: " << msg;
|
||
|
}
|
||
|
}
|
||
|
|
||
|
void ApismTcpClient::onSocketErrorOccured(QAbstractSocket::SocketError socketError)
|
||
|
{
|
||
|
QString msg;
|
||
|
bool flagReconnect = false;
|
||
|
|
||
|
switch (socketError) {
|
||
|
case QAbstractSocket::ConnectionRefusedError:
|
||
|
msg = "ConnectionRefusedError";
|
||
|
flagReconnect = true;
|
||
|
this->private_handleConnectionRefusedError();
|
||
|
break;
|
||
|
case QAbstractSocket::RemoteHostClosedError:
|
||
|
msg = "RemoteHostClosedError";
|
||
|
break;
|
||
|
case QAbstractSocket::HostNotFoundError:
|
||
|
msg = "HostNotFoundError";
|
||
|
break;
|
||
|
case QAbstractSocket::SocketAccessError:
|
||
|
msg = "SocketAccessError";
|
||
|
break;
|
||
|
case QAbstractSocket::SocketResourceError:
|
||
|
msg = "SocketResourceError";
|
||
|
break;
|
||
|
case QAbstractSocket::SocketTimeoutError:
|
||
|
msg = "SocketTimeoutError";
|
||
|
break;
|
||
|
case QAbstractSocket::DatagramTooLargeError:
|
||
|
msg = "DatagramTooLargeError";
|
||
|
break;
|
||
|
case QAbstractSocket::NetworkError:
|
||
|
msg = "NetworkError";
|
||
|
break;
|
||
|
case QAbstractSocket::AddressInUseError:
|
||
|
msg = "AddressInUseError";
|
||
|
break;
|
||
|
case QAbstractSocket::SocketAddressNotAvailableError:
|
||
|
msg = "SocketAddressNotAvailableError";
|
||
|
break;
|
||
|
case QAbstractSocket::UnsupportedSocketOperationError:
|
||
|
msg = "UnsupportedSocketOperationError";
|
||
|
break;
|
||
|
case QAbstractSocket::ProxyAuthenticationRequiredError:
|
||
|
msg = "ProxyAuthenticationRequiredError";
|
||
|
break;
|
||
|
case QAbstractSocket::SslHandshakeFailedError:
|
||
|
msg = "SslHandshakeFailedError";
|
||
|
break;
|
||
|
case QAbstractSocket::UnfinishedSocketOperationError:
|
||
|
msg = "UnfinishedSocketOperationError";
|
||
|
break;
|
||
|
case QAbstractSocket::ProxyConnectionRefusedError:
|
||
|
msg = "ProxyConnectionRefusedError";
|
||
|
break;
|
||
|
case QAbstractSocket::ProxyConnectionClosedError:
|
||
|
msg = "ProxyConnectionClosedError";
|
||
|
break;
|
||
|
case QAbstractSocket::ProxyConnectionTimeoutError:
|
||
|
msg = "ProxyConnectionTimeoutError";
|
||
|
break;
|
||
|
case QAbstractSocket::ProxyNotFoundError:
|
||
|
msg = "ProxyNotFoundError";
|
||
|
break;
|
||
|
case QAbstractSocket::ProxyProtocolError:
|
||
|
msg = "ProxyProtocolError";
|
||
|
break;
|
||
|
case QAbstractSocket::OperationError:
|
||
|
msg = "OperationError";
|
||
|
break;
|
||
|
case QAbstractSocket::SslInternalError:
|
||
|
msg = "SslInternalError";
|
||
|
break;
|
||
|
case QAbstractSocket::SslInvalidUserDataError:
|
||
|
msg = "SslInvalidUserDataError";
|
||
|
break;
|
||
|
case QAbstractSocket::TemporaryError:
|
||
|
msg = "TemporaryError";
|
||
|
break;
|
||
|
case QAbstractSocket::UnknownSocketError:
|
||
|
msg = "UnknownSocketError";
|
||
|
break;
|
||
|
|
||
|
}
|
||
|
|
||
|
qCritical() << "ApismTcpClient(" << this->expectedResponse << ")::ApismTcpClient::onSocketErrorOccured() -> " << msg;
|
||
|
|
||
|
if (flagReconnect) {
|
||
|
// try to reconnect for sending:
|
||
|
if (this->sendQueue.size() > 0) {
|
||
|
QTimer::singleShot(1000,
|
||
|
this,
|
||
|
[this]() { this->connectToHost(); }
|
||
|
);
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
|
||
|
void ApismTcpClient::private_handleConnectionRefusedError()
|
||
|
{
|
||
|
if (this->connectionRefusedCounter > 5) {
|
||
|
qCritical() << "ApismTcpClient(" << this->expectedResponse << ")::ApismTcpClient::connectionRefusedError()";
|
||
|
this->connectionRefusedCounter = 0;
|
||
|
emit this->connectionRefusedError();
|
||
|
}
|
||
|
else {
|
||
|
this->connectionRefusedCounter++;
|
||
|
}
|
||
|
}
|
||
|
|
||
|
|
||
|
|
||
|
QDebug operator<<(QDebug debug, ApismTcpClient::ExpectedResponse response)
|
||
|
{
|
||
|
switch (response) {
|
||
|
case ApismTcpClient::ExpectedResponse::JSON:
|
||
|
debug << "JSON";
|
||
|
break;
|
||
|
case ApismTcpClient::ExpectedResponse::STATE:
|
||
|
debug << "STATE";
|
||
|
break;
|
||
|
}
|
||
|
return debug;
|
||
|
}
|