QT 线程池 + TCP 小试(二)实现通信功能
生活随笔
收集整理的這篇文章主要介紹了
QT 线程池 + TCP 小试(二)实现通信功能
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
qghtcpserver.cpp #include "qghtcpserver.h" #include <assert.h> #include <QTcpSocket> QGHTcpServer::QGHTcpServer(QObject *parent,int nPayLoad ): QTcpServer(parent),m_nPayLoad(nPayLoad) {assert(m_nPayLoad>=256 && m_nPayLoad<=16*1024*1024);connect(this, SIGNAL(newConnection()), this, SLOT(new_client_recieved())); }QGHTcpServer::~QGHTcpServer() {} QList <QObject *> QGHTcpServer::clientsList() {return m_clientList.keys(); } void QGHTcpServer::SetPayload(int nPayload) {m_nPayLoad = nPayload;assert(m_nPayLoad>=256 && m_nPayLoad<=16*1024*1024); }void QGHTcpServer::new_client_recieved() {QTcpSocket * sock_client = nextPendingConnection();while (sock_client){connect(sock_client, SIGNAL(readyRead()),this, SLOT(new_data_recieved()));connect(sock_client, SIGNAL(disconnected()),this,SLOT(client_closed()));connect(sock_client, SIGNAL(error(QAbstractSocket::SocketError)),this, SLOT(displayError(QAbstractSocket::SocketError)));connect(sock_client, SIGNAL(bytesWritten(qint64)), this, SLOT(some_data_sended(qint64)));m_clientList[sock_client] = 0;emit evt_NewClientConnected(sock_client);sock_client = nextPendingConnection();} } void QGHTcpServer::client_closed() {QTcpSocket * pSock = qobject_cast<QTcpSocket*>(sender());if (pSock){emit evt_ClientDisconnected(pSock);m_buffer_sending.remove(pSock);m_buffer_sending_offset.remove(pSock);m_clientList.remove(pSock);pSock->deleteLater();} } void QGHTcpServer::new_data_recieved() {QTcpSocket * pSock = qobject_cast<QTcpSocket*>(sender());if (pSock)emit evt_Data_recieved(pSock,pSock->readAll()); } void QGHTcpServer::some_data_sended(qint64 wsended) {QTcpSocket * pSock = qobject_cast<QTcpSocket*>(sender());if (pSock){emit evt_Data_transferred(pSock,wsended);QList<QByteArray> & list_sock_data = m_buffer_sending[pSock];QList<qint64> & list_offset = m_buffer_sending_offset[pSock];while (list_sock_data.empty()==false){QByteArray & arraySending = *list_sock_data.begin();qint64 & currentOffset = *list_offset.begin();qint64 nTotalBytes = arraySending.size();assert(nTotalBytes>=currentOffset);qint64 nBytesWritten = pSock->write(arraySending.constData()+currentOffset,qMin((int)(nTotalBytes-currentOffset),m_nPayLoad));currentOffset += nBytesWritten;if (currentOffset>=nTotalBytes){list_offset.pop_front();list_sock_data.pop_front();}elsebreak;}} } void QGHTcpServer::displayError(QAbstractSocket::SocketError socketError) {QTcpSocket * pSock = qobject_cast<QTcpSocket*>(sender());if (pSock){emit evt_SocketError(pSock,socketError);pSock->disconnectFromHost();} }void QGHTcpServer::SendDataToClient(QObject * objClient,const QByteArray & dtarray) {if (m_clientList.find(objClient)==m_clientList.end())return;QTcpSocket * pSock = qobject_cast<QTcpSocket*>(objClient);if (pSock&&dtarray.size()){QList<QByteArray> & list_sock_data = m_buffer_sending[pSock];QList<qint64> & list_offset = m_buffer_sending_offset[pSock];if (list_sock_data.empty()==true){qint64 bytesWritten = pSock->write(dtarray.constData(),qMin(dtarray.size(),m_nPayLoad));if (bytesWritten < dtarray.size()){list_sock_data.push_back(dtarray);list_offset.push_back(bytesWritten);}}else{list_sock_data.push_back(dtarray);list_offset.push_back(0);} } } void QGHTcpServer::BroadcastData(QObject * objClient,const QByteArray & dtarray) {for(QMap<QObject *,int>::iterator p = m_clientList.begin();p!=m_clientList.end();p++){QTcpSocket * pSock = qobject_cast<QTcpSocket*>(p.key());if (pSock&&dtarray.size()&&pSock!=objClient){QList<QByteArray> & list_sock_data = m_buffer_sending[pSock];QList<qint64> & list_offset = m_buffer_sending_offset[pSock];if (list_sock_data.empty()==true){qint64 bytesWritten = pSock->write(dtarray.constData(),qMin(dtarray.size(),m_nPayLoad));if (bytesWritten < dtarray.size()){list_sock_data.push_back(dtarray);list_offset.push_back(bytesWritten);}else{list_sock_data.push_back(dtarray);list_offset.push_back(0);} }} } } void QGHTcpServer::KickAllClients() {QList<QObject *> clientList = m_clientList.keys();foreach(QObject * obj,clientList){QTcpSocket * pSock = qobject_cast<QTcpSocket*>(obj);if (pSock){pSock->disconnectFromHost();} }} 下一次,我會介紹最后的實現功能。
*免分資源鏈接點擊打開鏈接http://download.csdn.net/detail/goldenhawking/4492378
??????? 有了線程池,我們下一步就利用 QTcpServer 搭建一個服務器,接受客戶端的連接,并把數據發送到線程池上。由于 QTcpServer 資料太多了,這里不在贅述。唯一值得注意的是,當客戶端退出時,如果線程池隊列中還有該客戶的信息,這個信息還會被處理,只是無法再發送回去而已。其實,還可實現成客戶端退出,就發一個信號到線程池,刪除自己的所有任務。這個也很簡單,但之所以沒有做,因為這些數據的處理結果可能還會被其他消費者(而非生產者自己)使用,最典型的例子是從工業傳感器上采集的數據,其生成的圖像需要存儲到設備中去。
?????? QTcpSocket的 Write 方法默認是支持大體積數據的,即使一次發了500MB的數據,只要硬件資源可以承受,調用也會成功并立刻返回。接受者會以一定的載荷大小不停的觸發readyRead,直到發送全部成功。但是,為了能夠觀察到并控制收發隊列中的數據包的大小、體積,我們在外層實現了一個發送隊列,每次以 payLoad為大小發送數據包。這是從MFC中帶來的習慣,很難說好壞。
?qghtcpserver.h
#ifndef QGHTCPSERVER_H #define QGHTCPSERVER_H#include <QTcpServer> #include <QMap> #include <QList>class QGHTcpServer : public QTcpServer {Q_OBJECTpublic:QGHTcpServer(QObject *parent,int nPayLoad = 4096);~QGHTcpServer();//踢出所有客戶void KickAllClients();QList <QObject *> clientsList();void SetPayload(int nPayload); private:QMap<QObject *,QList<QByteArray> > m_buffer_sending;QMap<QObject *,QList<qint64> > m_buffer_sending_offset;QMap<QObject*,int> m_clientList;int m_nPayLoad; public slots://新的客戶連接到來void new_client_recieved();//客戶連接被關閉void client_closed();//新的數據到來void new_data_recieved();//一批數據發送完畢void some_data_sended(qint64);//客戶端錯誤void displayError(QAbstractSocket::SocketError socketError);//向客戶端發送數據void SendDataToClient(QObject * objClient,const QByteArray & dtarray);//向客戶端廣播數據,不包括 objFromClientvoid BroadcastData(QObject * objFromClient,const QByteArray & dtarray); signals://錯誤信息void evt_SocketError(QObject * senderSock ,QAbstractSocket::SocketError socketError);//新的客戶端連接void evt_NewClientConnected(QObject * client);//客戶端退出void evt_ClientDisconnected(QObject * client);//收到一批數據void evt_Data_recieved(QObject * ,const QByteArray & );//一批數據被發送void evt_Data_transferred(QObject * client,qint64); };#endif // QGHTCPSERVER_Hqghtcpserver.cpp #include "qghtcpserver.h" #include <assert.h> #include <QTcpSocket> QGHTcpServer::QGHTcpServer(QObject *parent,int nPayLoad ): QTcpServer(parent),m_nPayLoad(nPayLoad) {assert(m_nPayLoad>=256 && m_nPayLoad<=16*1024*1024);connect(this, SIGNAL(newConnection()), this, SLOT(new_client_recieved())); }QGHTcpServer::~QGHTcpServer() {} QList <QObject *> QGHTcpServer::clientsList() {return m_clientList.keys(); } void QGHTcpServer::SetPayload(int nPayload) {m_nPayLoad = nPayload;assert(m_nPayLoad>=256 && m_nPayLoad<=16*1024*1024); }void QGHTcpServer::new_client_recieved() {QTcpSocket * sock_client = nextPendingConnection();while (sock_client){connect(sock_client, SIGNAL(readyRead()),this, SLOT(new_data_recieved()));connect(sock_client, SIGNAL(disconnected()),this,SLOT(client_closed()));connect(sock_client, SIGNAL(error(QAbstractSocket::SocketError)),this, SLOT(displayError(QAbstractSocket::SocketError)));connect(sock_client, SIGNAL(bytesWritten(qint64)), this, SLOT(some_data_sended(qint64)));m_clientList[sock_client] = 0;emit evt_NewClientConnected(sock_client);sock_client = nextPendingConnection();} } void QGHTcpServer::client_closed() {QTcpSocket * pSock = qobject_cast<QTcpSocket*>(sender());if (pSock){emit evt_ClientDisconnected(pSock);m_buffer_sending.remove(pSock);m_buffer_sending_offset.remove(pSock);m_clientList.remove(pSock);pSock->deleteLater();} } void QGHTcpServer::new_data_recieved() {QTcpSocket * pSock = qobject_cast<QTcpSocket*>(sender());if (pSock)emit evt_Data_recieved(pSock,pSock->readAll()); } void QGHTcpServer::some_data_sended(qint64 wsended) {QTcpSocket * pSock = qobject_cast<QTcpSocket*>(sender());if (pSock){emit evt_Data_transferred(pSock,wsended);QList<QByteArray> & list_sock_data = m_buffer_sending[pSock];QList<qint64> & list_offset = m_buffer_sending_offset[pSock];while (list_sock_data.empty()==false){QByteArray & arraySending = *list_sock_data.begin();qint64 & currentOffset = *list_offset.begin();qint64 nTotalBytes = arraySending.size();assert(nTotalBytes>=currentOffset);qint64 nBytesWritten = pSock->write(arraySending.constData()+currentOffset,qMin((int)(nTotalBytes-currentOffset),m_nPayLoad));currentOffset += nBytesWritten;if (currentOffset>=nTotalBytes){list_offset.pop_front();list_sock_data.pop_front();}elsebreak;}} } void QGHTcpServer::displayError(QAbstractSocket::SocketError socketError) {QTcpSocket * pSock = qobject_cast<QTcpSocket*>(sender());if (pSock){emit evt_SocketError(pSock,socketError);pSock->disconnectFromHost();} }void QGHTcpServer::SendDataToClient(QObject * objClient,const QByteArray & dtarray) {if (m_clientList.find(objClient)==m_clientList.end())return;QTcpSocket * pSock = qobject_cast<QTcpSocket*>(objClient);if (pSock&&dtarray.size()){QList<QByteArray> & list_sock_data = m_buffer_sending[pSock];QList<qint64> & list_offset = m_buffer_sending_offset[pSock];if (list_sock_data.empty()==true){qint64 bytesWritten = pSock->write(dtarray.constData(),qMin(dtarray.size(),m_nPayLoad));if (bytesWritten < dtarray.size()){list_sock_data.push_back(dtarray);list_offset.push_back(bytesWritten);}}else{list_sock_data.push_back(dtarray);list_offset.push_back(0);} } } void QGHTcpServer::BroadcastData(QObject * objClient,const QByteArray & dtarray) {for(QMap<QObject *,int>::iterator p = m_clientList.begin();p!=m_clientList.end();p++){QTcpSocket * pSock = qobject_cast<QTcpSocket*>(p.key());if (pSock&&dtarray.size()&&pSock!=objClient){QList<QByteArray> & list_sock_data = m_buffer_sending[pSock];QList<qint64> & list_offset = m_buffer_sending_offset[pSock];if (list_sock_data.empty()==true){qint64 bytesWritten = pSock->write(dtarray.constData(),qMin(dtarray.size(),m_nPayLoad));if (bytesWritten < dtarray.size()){list_sock_data.push_back(dtarray);list_offset.push_back(bytesWritten);}else{list_sock_data.push_back(dtarray);list_offset.push_back(0);} }} } } void QGHTcpServer::KickAllClients() {QList<QObject *> clientList = m_clientList.keys();foreach(QObject * obj,clientList){QTcpSocket * pSock = qobject_cast<QTcpSocket*>(obj);if (pSock){pSock->disconnectFromHost();} }} 下一次,我會介紹最后的實現功能。
總結
以上是生活随笔為你收集整理的QT 线程池 + TCP 小试(二)实现通信功能的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Deepin系统更新apt-get源
- 下一篇: s3c2440移植MQTT