The OpenNET Project / Index page

[ новости /+++ | форум | теги | ]

форумы  помощь  поиск  регистрация  майллист  ВХОД  слежка  RSS
"socket, pthread чтение/запись боьших объемов для нескольких ..."
Вариант для распечатки  
Пред. тема | След. тема 
Форумы Программирование под UNIX (Public)
Изначальное сообщение [Проследить за развитием треда]

"socket, pthread чтение/запись боьших объемов для нескольких ..."  
Сообщение от ZPasha email(ok) on 25-Май-07, 13:39 
Необходимо написать сервер который принимает от клиента(ов) данные обрабатывает и сразу отправляет обратно на неблокирующих сокетах
Клиент по Виндо и использует Indy.IdTCPClient и методы Write/Read Stream
Ниже пример того что я уже сделал
При одном клиенте все ОК а при несокльких начинают пропадать пакеты

using namespace std;
struct cln_info_t
{
    int id;
    int sock;
    int snd_buf;
    int rcv_buf;
};
set<int> client_socks;
set<pthread_t> client_threads;
pthread_mutex_t msock;

int WriteStream( int sock, TMyMemoryStream *stream, const int ASize )
{
  int iSize = ASize;
  int iWriteSize = iBlock > iSize ? iSize : iBlock;
  stream->SetPosition( 0 ) ;
  char Buf[iBlock];
  while( iWriteSize > 0 )
  {
    try
    {
      memset( Buf, 0, iBlock );
      int iPos = stream->GetPosition();
      stream->ReadBuffer( Buf, iWriteSize );
      int iBytesSend = send( sock, Buf, iWriteSize, 0 );
      if( iBytesSend == 0 )
      {
        if( errno == EAGAIN || errno == EINTR )
        {
        //cout << "Socket was blocked!Read Again";
          continue;
        }
        else
        {
          perror( "Error writing Stream to Socket!!!" );
          return -1;
        }
      }
      iSize -= iBytesSend;
      stream->SetPosition( iPos + iBytesSend );
      iWriteSize = iBlock > iSize ? iSize : iBlock;
    }
    catch(...)
    {

    }
  }
  return iWriteSize;
}

int ReadStream( int sock, TMyMemoryStream *stream, const int ASize )
{
  int iSize = 0;
  stream->Clear();
  char Buf[iBlock];
  while( iSize < ASize )
  {
    usleep(100);
    fd_set readset;
    FD_ZERO( &readset );
    FD_SET( sock, &readset );
    if( FD_ISSET(sock, &readset) )
    {
      FD_ZERO( &readset );
      FD_SET( sock, &readset );
      memset( Buf, 0, iBlock );
      int iBytesRead = recv( sock, Buf, iBlock, 0 );      
      if( iBytesRead == 0 )
      {
        perror( "Connection Closed!!!" );
        return -1;
      }
      else if( iBytesRead < 0 )
      {
        if( errno == EAGAIN || errno == EINTR )
        {
        //cout << "Socket was blocked!Read Again";
          continue;
        }
        else
        {
          perror( "Error reading Stream from Socket!!!" );
          return -1;
        }
      }
      stream->WriteBuffer( Buf, iBytesRead );
      iSize += iBytesRead;
    }
  }
  return iSize;
}

static void *client_thread( void *arg )
{
  cln_info_t cln_info =*(cln_info_t *)arg;
  bool disconnected = false;
  while( !disconnected )
  {
    usleep(100);
      //cout << "stream created!" << endl;
      TMyMemoryStream *stream = new TMyMemoryStream();
      pthread_mutex_lock( &msock );
      int br = ReadStream( cln_info.sock, stream, sizeof(int) );
      pthread_mutex_unlock( &msock );
      if( br == sizeof(int) )
      {  
        stream->SetPosition( 0 );
        int iSize;
        stream->ReadBuffer( &iSize, sizeof( int ) );
        //cout << "iSize = " << iSize << endl;
        stream->Clear();
        pthread_mutex_lock( &msock );
        br = ReadStream( cln_info.sock, stream, iSize );
        pthread_mutex_unlock( &msock );
      }
      //else
        //br = -1;
      //cout << "client " << cln_info.id << " recv file!" << br << endl;
      if( br == -1 )
      {
        cout << "client " << cln_info.id << ": client disconnected!!!" << endl;
        close( cln_info.sock );
        set<int>::iterator it = client_socks.find( cln_info.sock );
        if( it != client_socks.end() )
        client_socks.erase( *it );
        disconnected = true;
      }
      else
      {
        int is = stream->GetSize();
        pthread_mutex_lock( &msock );
        WriteStream( cln_info.sock, stream, is );
        pthread_mutex_unlock( &msock );
        //cout << "client " << cln_info.id << " send file!" << endl;
      }
      delete stream;
      //cout << "stream deleted" << endl;
  }
  return (void *)true;
};

int main()
{
int listener;
struct sockaddr_in addr;
cout << "Server Started!" << endl;
listener = socket( AF_INET, SOCK_STREAM, 0 );
int snd_buf;
socklen_t len = sizeof(int);
getsockopt( listener, SOL_SOCKET,SO_SNDBUF, &snd_buf, &len );
cout << snd_buf << endl;
if( listener <= 0 )
{
  perror("socket");
  close( listener );
  exit(1);
}
//int flags1 = fcntl( listener, F_GETFL, 0 );
//fcntl( listener, F_SETFL, flags1 | O_NONBLOCK);
addr.sin_family = AF_INET;
addr.sin_port = htons(3425);
addr.sin_addr.s_addr = INADDR_ANY;
if( bind( listener, (struct sockaddr *)&addr, sizeof( addr ) ) < 0 )
{
  perror("bind");
  close( listener );
  exit(2);
}
listen( listener, SOMAXCONN );

int ic = 0;
client_socks.clear();
client_threads.clear();
while(1)
{
  usleep(100);
  fd_set readset;
  FD_ZERO( &readset );
  FD_SET( listener, &readset );
  
  timeval timeout;
  timeout.tv_sec = 0;
  timeout.tv_usec = 100;
  int ires = select( listener + 1, &readset, NULL, NULL, &timeout );
  if( ires < 0 )
  {
   perror("select");
   close( listener );
   exit(3);
  }
  else if( ires == 0 )
    continue;
  if( FD_ISSET( listener, &readset ) )
  {
    pthread_mutex_lock( &msock );
    int sock = accept( listener, NULL, NULL );
    pthread_mutex_unlock( &msock );
    if( sock < 0 )
    {
  perror("accept");
  close( listener );
  exit(3);
    }
    cout << "client connected!!" << endl;
    //int flags = fcntl( sock, F_GETFL, 0 );
    //fcntl( sock, F_SETFL, flags | O_NONBLOCK);
    client_socks.insert(sock);
    cln_info_t info;
    info.id = ic;
    ic++;
    info.sock = sock;
    info.snd_buf = snd_buf;
    pthread_t tid;
    if( pthread_create(&tid, NULL, client_thread, (void *)&info ) == 0 )
      client_threads.insert( tid );
  }
}
close( listener );
return 0;
}

Высказать мнение | Ответить | Правка | Наверх | Cообщить модератору

 Оглавление

Сообщения по теме [Сортировка по времени, UBB]


1. "socket, pthread чтение/запись боьших объемов для нескольких ..."  
Сообщение от vic (??) on 25-Май-07, 14:09 
send, recv не проверяются на -1

и нафик неблокирующие сокеты? select/poll

Высказать мнение | Ответить | Правка | Наверх | Cообщить модератору

2. "socket, pthread чтение/запись боьших объемов для нескольких ..."  
Сообщение от ZPasha email(ok) on 25-Май-07, 14:13 
>send, recv не проверяются на -1
вобщето проверяются в Read/WriteStream
>
>и нафик неблокирующие сокеты? select/poll
пробовал и блокирующие проблема не исчезает
с select проблема тоже осталась


Высказать мнение | Ответить | Правка | Наверх | Cообщить модератору

3. "socket, pthread чтение/запись боьших объемов для нескольких ..."  
Сообщение от vic (??) on 25-Май-07, 14:19 
>>send, recv не проверяются на -1
>вобщето проверяются в Read/WriteStream

    if( iBytesSend == 0 )
????

Высказать мнение | Ответить | Правка | Наверх | Cообщить модератору

4. "socket, pthread чтение/запись боьших объемов для нескольких ..."  
Сообщение от NuINu (??) on 25-Май-07, 14:24 
>Необходимо написать сервер который принимает от клиента(ов) данные обрабатывает и сразу отправляет
>обратно на неблокирующих сокетах
>Клиент по Виндо и использует Indy.IdTCPClient и методы Write/Read Stream
>Ниже пример того что я уже сделал
>При одном клиенте все ОК а при несокльких начинают пропадать пакеты

        pthread_mutex_lock( &msock );
        br = ReadStream( cln_info.sock, stream, iSize );
        pthread_mutex_unlock( &msock );
и ему подобный код в функции обработки потоков при однодновременной работе нескольких потоков работать не будут!!! они и вызывают сбой!

Мьютексы надо блокировать лишь затем что бы получить доступ к общей структуре списка, а там нужно выставить пометку что данная структура обрабатывается таким то потоком, а перед этим проверить не обрабатывает ли данную структуру уже другой поток.
А при завершении также блокировать и освобождать структуру. И работать уже непосредственно с чтением и записью сокетов без всяких установленных блокировок.

Иначе у тебя все потоки висят в блокировке а обрабатывается только один.

Высказать мнение | Ответить | Правка | Наверх | Cообщить модератору

Архив | Удалить

Индекс форумов | Темы | Пред. тема | След. тема
Оцените тред (1=ужас, 5=супер)? [ 1 | 2 | 3 | 4 | 5 ] [Рекомендовать для помещения в FAQ]




Партнёры:
PostgresPro
Inferno Solutions
Hosting by Hoster.ru
Хостинг:

Закладки на сайте
Проследить за страницей
Created 1996-2025 by Maxim Chirkov
Добавить, Поддержать, Вебмастеру