Необходимо написать сервер который принимает от клиента(ов) данные обрабатывает и сразу отправляет обратно на неблокирующих сокетах
Клиент по Виндо и использует Indy.IdTCPClient и методы Write/Read Stream
Ниже пример того что я уже сделал
При одном клиенте все ОК а при несокльких начинают пропадать пакетыusing namespace std;
struct cln_info_t
{
int id;
int sock;
int snd_buf;
int rcv_buf;
};
set<int> client_socks;
set<pthread_t> client_threads;
pthread_mutex_t msock;
int WriteStream( int sock, TMyMemoryStream *stream, const int ASize )
{
int iSize = ASize;
int iWriteSize = iBlock > iSize ? iSize : iBlock;
stream->SetPosition( 0 ) ;
char Buf[iBlock];
while( iWriteSize > 0 )
{
try
{
memset( Buf, 0, iBlock );
int iPos = stream->GetPosition();
stream->ReadBuffer( Buf, iWriteSize );
int iBytesSend = send( sock, Buf, iWriteSize, 0 );
if( iBytesSend == 0 )
{
if( errno == EAGAIN || errno == EINTR )
{
//cout << "Socket was blocked!Read Again";
continue;
}
else
{
perror( "Error writing Stream to Socket!!!" );
return -1;
}
}
iSize -= iBytesSend;
stream->SetPosition( iPos + iBytesSend );
iWriteSize = iBlock > iSize ? iSize : iBlock;
}
catch(...)
{
}
}
return iWriteSize;
}
int ReadStream( int sock, TMyMemoryStream *stream, const int ASize )
{
int iSize = 0;
stream->Clear();
char Buf[iBlock];
while( iSize < ASize )
{
usleep(100);
fd_set readset;
FD_ZERO( &readset );
FD_SET( sock, &readset );
if( FD_ISSET(sock, &readset) )
{
FD_ZERO( &readset );
FD_SET( sock, &readset );
memset( Buf, 0, iBlock );
int iBytesRead = recv( sock, Buf, iBlock, 0 );
if( iBytesRead == 0 )
{
perror( "Connection Closed!!!" );
return -1;
}
else if( iBytesRead < 0 )
{
if( errno == EAGAIN || errno == EINTR )
{
//cout << "Socket was blocked!Read Again";
continue;
}
else
{
perror( "Error reading Stream from Socket!!!" );
return -1;
}
}
stream->WriteBuffer( Buf, iBytesRead );
iSize += iBytesRead;
}
}
return iSize;
}
static void *client_thread( void *arg )
{
cln_info_t cln_info =*(cln_info_t *)arg;
bool disconnected = false;
while( !disconnected )
{
usleep(100);
//cout << "stream created!" << endl;
TMyMemoryStream *stream = new TMyMemoryStream();
pthread_mutex_lock( &msock );
int br = ReadStream( cln_info.sock, stream, sizeof(int) );
pthread_mutex_unlock( &msock );
if( br == sizeof(int) )
{
stream->SetPosition( 0 );
int iSize;
stream->ReadBuffer( &iSize, sizeof( int ) );
//cout << "iSize = " << iSize << endl;
stream->Clear();
pthread_mutex_lock( &msock );
br = ReadStream( cln_info.sock, stream, iSize );
pthread_mutex_unlock( &msock );
}
//else
//br = -1;
//cout << "client " << cln_info.id << " recv file!" << br << endl;
if( br == -1 )
{
cout << "client " << cln_info.id << ": client disconnected!!!" << endl;
close( cln_info.sock );
set<int>::iterator it = client_socks.find( cln_info.sock );
if( it != client_socks.end() )
client_socks.erase( *it );
disconnected = true;
}
else
{
int is = stream->GetSize();
pthread_mutex_lock( &msock );
WriteStream( cln_info.sock, stream, is );
pthread_mutex_unlock( &msock );
//cout << "client " << cln_info.id << " send file!" << endl;
}
delete stream;
//cout << "stream deleted" << endl;
}
return (void *)true;
};
int main()
{
int listener;
struct sockaddr_in addr;
cout << "Server Started!" << endl;
listener = socket( AF_INET, SOCK_STREAM, 0 );
int snd_buf;
socklen_t len = sizeof(int);
getsockopt( listener, SOL_SOCKET,SO_SNDBUF, &snd_buf, &len );
cout << snd_buf << endl;
if( listener <= 0 )
{
perror("socket");
close( listener );
exit(1);
}
//int flags1 = fcntl( listener, F_GETFL, 0 );
//fcntl( listener, F_SETFL, flags1 | O_NONBLOCK);
addr.sin_family = AF_INET;
addr.sin_port = htons(3425);
addr.sin_addr.s_addr = INADDR_ANY;
if( bind( listener, (struct sockaddr *)&addr, sizeof( addr ) ) < 0 )
{
perror("bind");
close( listener );
exit(2);
}
listen( listener, SOMAXCONN );
int ic = 0;
client_socks.clear();
client_threads.clear();
while(1)
{
usleep(100);
fd_set readset;
FD_ZERO( &readset );
FD_SET( listener, &readset );
timeval timeout;
timeout.tv_sec = 0;
timeout.tv_usec = 100;
int ires = select( listener + 1, &readset, NULL, NULL, &timeout );
if( ires < 0 )
{
perror("select");
close( listener );
exit(3);
}
else if( ires == 0 )
continue;
if( FD_ISSET( listener, &readset ) )
{
pthread_mutex_lock( &msock );
int sock = accept( listener, NULL, NULL );
pthread_mutex_unlock( &msock );
if( sock < 0 )
{
perror("accept");
close( listener );
exit(3);
}
cout << "client connected!!" << endl;
//int flags = fcntl( sock, F_GETFL, 0 );
//fcntl( sock, F_SETFL, flags | O_NONBLOCK);
client_socks.insert(sock);
cln_info_t info;
info.id = ic;
ic++;
info.sock = sock;
info.snd_buf = snd_buf;
pthread_t tid;
if( pthread_create(&tid, NULL, client_thread, (void *)&info ) == 0 )
client_threads.insert( tid );
}
}
close( listener );
return 0;
}