首页 资讯 社群 我的社区 搜索

Linux C多线程处理框架

linux5.net
2018-09-05 14:50:29

一个多线程处理的程序的框架,主要是从一个目录中读取要求的文件,进行文件解码,并将源目录的文件移到另一个目录。解码程序调用了一个可执行程序。具体代码如下:

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <math.h>
#include <time.h>
#include <dirent.h>
#include <sys/stat.h>
#include <pthread.h>
void * decode_grb_thread(void * arg);
char* getTime();
void WriteMsg(FILE *fp, char *msg);
#define PATHNAME "/space/rdb/RDB/personal/xuyj/grb"
#define DIR_LEN 80
#define FILE_LEN 100
char pathname[DIR_LEN];
FILE *flogp=NULL;
#define NUM_THREADS 3
#define NUM_FILES 200
char files[NUM_FILES][FILE_LEN];
int fileNum[NUM_THREADS];
pthread_t a_thread[NUM_THREADS] ;
main()
{
 int i,j;
 int ret;
 DIR *dirp;
 char errmsg[300];
 char msg[300];
 struct dirent *directp;
 int flag;
 char filename[FILE_LEN];
 char fullfilename[FILE_LEN+DIR_LEN];
 char mvname[FILE_LEN+DIR_LEN];
 int count_file_num;
 long fplog_prev,fplog_current;
 int ct[10];
 struct stat status;
 /* added by juling on 20060413 */
 char cpname[FILE_LEN+DIR_LEN];
 char cpsh[FILE_LEN+DIR_LEN];
 /* added by juling on 20070213 */
 char errfilename[FILE_LEN];
 char postfix[10];
 char filetime[15];
 char rtime[15];
 ret = 0;
 /* new */
 int lots_of_threads;
 void *thread_result;
 memset(&a_thread, 0, sizeof(a_thread));
 char logfname[127];
 time_t timer;
 struct tm *tim;
 int hour;
 char now_time[20];
 /*
 * the variable used to record file number of current
 * directory
 */
 count_file_num = 0;
 /* Get log file name */
 timer = time(NULL);
 tim = gmtime(&timer);
 hour = tim->tm_hour;
 sprintf(now_time, "%04d-%02d-%02d-%02d:%02d:%02d", tim->tm_year,
  tim->tm_mon+1, tim->tm_mday, hour,
  tim->tm_min, tim->tm_sec);
 /*日志*/
 sprintf(logfname, "%s/%s/rdb%04d%02d%02dgrib.log",PATHNAME, "log",
  tim->tm_year+1900, tim->tm_mon+1, tim->tm_mday);
 /* Open log file */
 if((flogp = fopen(logfname, "a+")) == NULL){
  printf("Error open ftp_grib log file: %sn", logfname);
  exit(0);
 }
 /*
 * concatenate path name
 */
 /* modified by juling on 20060411 */
 sprintf(pathname,"%s/%s",PATHNAME,"/source");
 strcat(pathname,"");
 if ( strlen(pathname) > DIR_LEN )
 {
  sprintf(errmsg,"101:length of source path name is too long.");
  WriteMsg(flogp, errmsg);
  exit(0);
 }
 if((dirp=opendir(pathname)) == NULL)
 {
  /*
  * write error message
  */
  sprintf(errmsg,"Can't open %s and error number=%dn",pathname,errno);
  WriteMsg(flogp, errmsg);
  return ;
 }
 else
 {
  /*
  * write process informaton into log
  */
  sprintf(msg,"open %s",pathname);
  WriteMsg(flogp, msg);
 }
 while((directp=(struct dirent *)readdir(dirp)) != NULL)
 {
  strcpy(filename,directp->d_name);
  sprintf(fullfilename,"%s/%s",pathname,filename);
  /*
  * Judge if file belongs to be processed by present
  * process
  */
  /*不为C3E继续下一个文件*/
  if (strstr(directp->d_name,"C3E")==NULL)
  {
   continue;
  }
  if ( strlen(directp->d_name) > FILE_LEN )
  {
   sprintf(errmsg,"102:<%s>length of source file name is too long .",filename);
   WriteMsg(flogp, errmsg);
   continue;
  }
  /* modified by juling on 20051227 */
  if( strlen(directp->d_name) < 18 )
  {
   /*remove(fullfilename);*/
   sprintf(errmsg,"103:<%s>length of source file name is too short.",filename);
   WriteMsg(flogp, errmsg);
   continue;
  }
  if ( strstr(directp->d_name,".tmp") != NULL )
  {
   continue;
  }
  if(stat(fullfilename,&status) == -1) continue;
  if(!S_ISREG(status.st_mode)) continue;
  if(status.st_size == 0) continue;
  strcpy(files[count_file_num],filename);
  count_file_num++;
  /*---一批只处理NUM_FILES个文件----*/
  if(count_file_num>NUM_FILES-1) break;
 } /* end while((directp=(struct dirent *)readdir(dirp))!=NULL) */
 /*
 * write total file number in current directory into log
 */
 sprintf(msg,"Total file number in the directory is %d.n",count_file_num);
 WriteMsg(flogp, msg);
 closedir(dirp);
 /* 多线程处理 */
 for( i=0;i<count_file_num;i++)
 {
  printf("count_file_num is (%d)n ",i);
  flag=0;
  for(lots_of_threads=0;lots_of_threads<NUM_THREADS-1;lots_of_threads++)
  {
   if(a_thread[lots_of_threads]==0)
   {
    fileNum[lots_of_threads]=i;
    ret = pthread_create(&(a_thread[lots_of_threads]), NULL, decode_grb_thread, (void *)lots_of_threads);
    if (ret != 0)
    {
     sprintf(msg,"<Thread> : Thread (%d) creation failed",lots_of_threads);
     WriteMsg(flogp, msg);
     a_thread[lots_of_threads]=0;
    }
    flag=1;
    break;
   }
  }
  if(flag==1) continue;
  lots_of_threads = NUM_THREADS - 1;
  printf("thread is %d",lots_of_threads);
  fileNum[lots_of_threads]=i;
  ret = pthread_create(&(a_thread[lots_of_threads]), NULL, decode_grb_thread, (void *)lots_of_threads);
  sleep(50);
  if(a_thread[lots_of_threads]!=0)
  {
   ret = pthread_join(a_thread[lots_of_threads], &thread_result);
   if (ret == 0)
   {
    sprintf(msg,"<Thread> : Thread (%d) Picked upn ",lots_of_threads);
   } else {
    sprintf(msg,"<Thread> :Thread (%d) pthread_join failedn ",lots_of_threads);
   }
   WriteMsg(flogp, msg);
  }
 }
 /*---最后等待所有线程结束 */
 for(lots_of_threads=0;lots_of_threads<NUM_THREADS;lots_of_threads++)
 {
  if(a_thread[lots_of_threads]!=0)
  {
   ret = pthread_join(a_thread[lots_of_threads], &thread_result);
   if (ret == 0)
   {
    sprintf(msg,"<Thread> :Thread (%d) Picked upn ",lots_of_threads);
   } else {
    sprintf(msg,"<Thread> :Thread (%d) pthread_join failedn ",lots_of_threads);
   }
   WriteMsg(flogp, msg);
  }
 }
 fclose(flogp);
 exit(0);
}
 
void * decode_grb_thread(void * arg)
{
 int ret=0;
 char starttime[30];
 char endtime[30];
 char errmsg[300];
 char msg[300];
 /* added by xuyj on 20130301 */
 char fullfilename[FILE_LEN+DIR_LEN];
 char mvname[FILE_LEN+DIR_LEN];
 int count_file_num;
 struct stat status;
 /* added by juling on 20060413 */
 char cpname[FILE_LEN+DIR_LEN];
 char cpsh[FILE_LEN+DIR_LEN];
 /* added by juling on 20070213 */
 char errfilename[FILE_LEN];
 char filename[FILE_LEN];
 char exefilename[FILE_LEN+10];
 /* 线程记录 */
 int thread_num=(int)arg;
 int fnum=fileNum[thread_num];
 /*文件名*/
 sprintf(filename,"%s",files[fnum]);
 sprintf(fullfilename,"%s/%s",pathname,filename);
 sprintf(starttime,"%s",getTime());
 sprintf(msg,"<INFO> %s,Start_time:%s,thread <%d>",filename,starttime,thread_num);
 /*记录开始时间*/
 WriteMsg(flogp,msg);
 if ( strstr(filename,"C3E") != NULL )
 {
  /*
  * process grb file
  */
  sprintf(exefilename,"ec_data_make_ftp %s",fullfilename);
  ret=system(exefilename);
  /*ret = main_grib(fullfilename);*/
 }
 else{
  a_thread[thread_num]=0;
  return;
 }
 /*
 * time when finishing file process
 */
 sprintf(endtime,"%s",getTime());
 sprintf(msg,"<INFO> %s,End_time:%s,thread <%d>",filename,endtime,thread_num);
 /*记录结束时间*/
 WriteMsg(flogp,msg);
 fprintf(flogp,"***************************************nn");
 /*
 * give  error file name
 */
 strcpy(errfilename,filename);
 strcat(errfilename,".err");
 /*
 * move file to trash directory
 */
 /* modified by juling on 20051109 */
 /*
 * if file is defined bfile type,then moves it to
 * bfile directory instead of trash directory
 */
 /* modified by wangyan on 20061120 */
 if ( strstr(filename,"C3E") != NULL )
 {
  /*  output error file  */
  if ( ret != 0 )
  {
   sprintf(mvname,"%s/%s/%s",PATHNAME,"error",errfilename);
  }
  else
  {
   sprintf(mvname,"%s/%s/%s",PATHNAME,"move",filename);
  }
 }
 /* ret = rename(fullfilename,mvname);*/
 if (ret == -1)
 {
  sprintf(errmsg,"<ERROR> move %s to %s errorn error is: %sn",fullfilename,mvname,strerror(errno));
  WriteMsg(flogp,errmsg);
 }else
 {
  sprintf(msg,"<OK> move %s to %s OK ",fullfilename,mvname);
  WriteMsg(flogp,msg);
 }
 a_thread[thread_num]=0;
 pthread_exit(NULL);
}
char* getTime()
{
 time_t timer;
 struct tm *tim;
 int hour;
 char now_time[20];
 timer = time(NULL);
 tim = gmtime(&timer);
 hour = tim->tm_hour;
 sprintf(now_time, "%04d-%02d-%02d-%02d:%02d:%02d", tim->tm_year,
  tim->tm_mon+1, tim->tm_mday, hour,
  tim->tm_min, tim->tm_sec);
 return now_time;
}
void WriteMsg(FILE *fp, char *msg)
{
 fprintf(fp,"--- ");
 fprintf(fp,"%sn",msg);
 fflush(fp);
}


用户评论