Home > Articles > Programming > C/C++

  • Print
  • + Share This
Like this article? We recommend

Like this article? We recommend

Primary Components

In our example, all of the files to be analyzed are on a single computer (node) of the cluster. We know which computer they’re on, but we don’t know where they are. So the utility has to search the entire computer for the files that fit our search criteria. Considering that any one node on the cluster has several hundred gigabytes of file storage, locating the files that meet our search criteria requires work. Since all of the files are on a single machine, and the nodes don’t share a filesystem, the rest of the nodes on the cluster are not involved in the search.

Table 1 describes the nine basic components of the text analysis utility.

Table 1 Basic components of the text analysis utility.

Component

Description

A

Locates files that match the initial search criteria.

B

Parses located files into containers.

C

Distributes work to nodes of the cluster.

D

Performs text analysis.

E

Receives results from nodes of the cluster.

F

list<string> ListFiles contains the filenames of the files to be searched.

G

list<string> Document contains the tokens for a given file.

H

The analysis object contains the result of a single text-file analysis.

I

list<analysis> AnalysisResult contains a list of analysis objects.

Components A and E are multithreaded. That is, the work that components A and E does is divided into a series of threads that can be executed in parallel. In component A, we use multithreading to speed up the search for the files that meet our initial search criteria. In component E, the manager node has assigned a listener thread to each worker node. As each worker node completes an analysis, it sends the results back to the manager node, which routes the results to the appropriate listener thread.

Keep in mind that threads are lightweight processes. Compared to a process, a thread requires less operating system overhead. Threads are easier than processes to create, manipulate, and maintain. In this utility, multithreading is accomplished through the use of the Pthread Library, which supplies an API to create and manage the threads in an application. The Pthread Library is based on a standardized programming interface for the creation and maintenance of threads. The thread interface has been specified by the IEEE standards committee in the POSIX 1003.1c standard.

The Pthread Library contains more than 60 functions, which can be divided into three categories. Table 2 shows the categories and functional breakdown.

Table 2 Three categories of the Pthread Library functions.

Thread Management Functions

Mutex Functions

Condition Variable Functions

Thread configuration

Thread cancellation

Thread scheduling

Thread-specific data signals

Thread attribute functions:

Thread attribute configuration

Thread attribute stack configuration

Thread attribute scheduling configuration

Mutex configuration

Priority management

Mutex attribute functions:

Mutex attribute configuration

Mutex attribute protocol

Mutex attribute priority management

Condition variable configuration

Condition variable attribute functions:

Condition variable attribute configuration

Condition variable sharing functions

Component A of the utility searches the root directory for other directories. If 5 directories are found, 5 threads are created; if 10,000 directories are found, 10,000 threads are created; and so on. Obviously, the maximum number of threads possible is dictated by the available operating system resources . The sysconf() function can be called to determine the threads available by using the following syntax:

#include <unistd.h>

int main(int argc,char *argv[])
{
  ...
  sysconf(_SC_THREAD_THREADS_MAX);
  ...
}

One thread is assigned to search each directory and its subdirectories. The thread searches its directories recursively, looking only for filenames that meet the initial search criteria. So if there were 20 root directories to start with, there would be 20 threads executing in parallel, searching each subdirectory for files that matched the initial search criteria. Once a filename is found, the thread places the complete filename (including path) into a list<string> container.

Listing 1 shows the source code that each thread uses to search the directories.

Listing 1 Code for searching directories.

 1
 2  #include <dirent.h>
 3
 4  #include <limits.h>
 5  #include <unistd.h>
 6  #include <sys/types.h>
 7  #include <sys/stat.h>
 8  #include <pthread.h>
 9  #include "direct-files.h"
 10  #include <iostream>
 11  #include <string>
 12  #include <vector>
 13  #include <list>
 14
 15
 16  pthread_mutex_t ListFileMutex = PTHREAD_MUTEX_INITIALIZER;
 17
 18  #ifndef PATH_MAX
 19  const int PATH_MAX = 300;
 20  #endif
 21
 22
 23  extern vector<string> ListDirectories;
 24  extern vector<string> ListFiles;
 25  int filecount = 0;
 26  int dircount = 0;
 27
 28  int isDirectory(const char *FileName)
 29  {
 30  struct stat StatBuffer;
 31
 32  lstat(FileName, &StatBuffer);
 33  if((StatBuffer.st_mode & S_IFMT) == -1)
 34  {
 35   cerr << "could not get stats on file" << endl;
 36   return (0);
 37  }
 38  else{
 39    if((StatBuffer.st_mode & S_IFMT) == S_IFDIR){
 40     return (1);
 41    }
 42    else{
 43      return (0);
 44    }
 45  }
 46  }
 47
 48
 49  int isRegular(string FileName,uid_t ID,off_t size)
 50  {
 51  struct stat StatBuffer;
 52
 53  lstat(FileName.c_str(),&StatBuffer);
 54  if((StatBuffer.st_mode & S_IFMT) == S_IFDIR)
 55  {
 56   cerr << "could not get stats on file" << endl;
 57   return (0);
 58  }
 59  else{
 60    if((StatBuffer.st_mode & S_IFMT) == S_IFREG) {
 61     if((StatBuffer.st_uid == ID) && (StatBuffer.st_size <= size)){
 62      return (1);
 63     }
 64    }
 65    else{
 66      return (0);
 67    }
 68  }
 69  return (0);
 70  }
 71
 72
 73  void listFiles(const char *CurrentDir,uid_t ID,off_t Size)
 74  {
 75  DIR *DirP;
 76  string FileName;
 77  string Temp;
 78  struct dirent *EntryP;
 79  char Name[PATH_MAX +1];
 80
 81  chdir(CurrentDir);
 82  DirP = opendir(CurrentDir);//open the directory
 83
 84  if(DirP == NULL){
 85   cerr << "could not open Directory: " << endl;
 86   cerr << CurrentDir << endl;
 87   return;
 88  }
 89
 90  EntryP = readdir(DirP); //read the directory
 91
 92  while(EntryP != NULL)
 93  {
 94   Temp.erase();
 95   Temp.assign(EntryP->d_name);
 96   if((Temp != ".") && (Temp != "..")){
 97    FileName.assign(CurrentDir);
 98    FileName.append(1,’/’);
 99    FileName.append(Temp);
100    if(isDirectory(FileName.c_str())){
101    listFiles(FileName.c_str(),ID,Size);
102    }
103    else{
104      if(isRegular(FileName.c_str(),ID,Size)){
105     pthread_mutex_lock(&ListFileMutex);
106      ListFiles.push_back(FileName);
107     pthread_mutex_unlock(&ListFileMutex);
108      filecount++;
109      }
110    }
111   }
112   EntryP = readdir(DirP); //read the next file
113
114  }
115  closedir(DirP);
116  }
117
118
119
120
121  void listDirectories(const char *CurrentDir)
122  {
123  DIR *DirP;
124  string FileName;
125  string Temp;
126  struct dirent *EntryP;
127  char Name[PATH_MAX +1];
128
129
130  chdir(CurrentDir);
131  DirP = opendir(CurrentDir);//open the directory
132
133  if(DirP == NULL){
134   cerr << "could not open directory in ListDirectories()" << endl;
135   return;
136  }
137
138  EntryP = readdir(DirP); //read the directory
139
140  while(EntryP != NULL)
141  {
142   Temp.erase();
143   Temp.assign(EntryP->d_name);
144   if((Temp != ".") && (Temp != "..")){
145    FileName.erase();
146    FileName = CurrentDir;
147    FileName.append(1,’/’);
148    FileName.append(Temp);
149    if(isDirectory(FileName.c_str())){
150     ListDirectories.push_back(FileName);
151     dircount++;
152    }
153   }
154   EntryP = readdir(DirP); //read the next file
155
156  }
157  closedir(DirP);
158  }
159
160
161
162
163

Notice in line 99 of Listing 1 that once a file is located it’s placed in a container named ListFiles.

To have threads execute the listFiles() function in Listing 1, it’s first necessary to create the thread and assign the thread a thread function. Threads are created and assigned their thread function using the following:

pthread_create() function

Table 3 shows the arguments and their descriptions for the pthread_create() function.

Table 3 Arguments and descriptions for the pthread_create() function.

Function/Parameters

Description

int

pthread_create(pthread_t *restrict thread,

const pthread_attr_t *restrict attr,

void *(*start_routine)(void*),

void *restrict arg);

Creates a new thread in the address space of a process. If the function creates the thread successfully, it will return the thread ID and store the value in the thread parameter.

pthread_t *restrict thread

Points to a thread handle or thread ID of the thread that will be created.

const pthread_attr_t *restrict attr

An attribute object that contains the attributes of the newly created thread.

void *(*start_routine)(void*)

The function that will be immediately executed by the newly created thread.

void *restrict arg

Contains the arguments for the function start_routine.

The thread function traverse() is shown in Listing 2.

Listing 2 The traverse() function.

 1  ...
 2
 3  void *traverse(void *Arg)
 4  {
 5
 6  lookup_key *LookupKey;
 7  LookupKey = static_cast<lookup_key *>(Arg);
 8  listFiles(LookupKey->DirectoryName.c_str(),LookupKey->Uid,SearchSize);
 9  delete LookupKey;
10  pthread_exit(NULL);
11  return(NULL);
12  }
13
14  ...

The traverse() function calls the listFiles() function, which actually does the recursive directory search.

The primary objects that the utility manipulates are components F thru I (refer to Table 1). Our goal is to move these objects between nodes on the cluster in a straightforward and simple fashion. Since components F thru I are compound objects, transmitting them between nodes in the cluster requires some work. We used the interfaces classes discussed in parts 1 and 2 of this series to simplify and streamline the work required. Of particular interest was how the list<string> Document and list<analysis> AnalysisResult objects were transmitted between nodes in the cluster. For example, the analysis object is a compound object:

struct analysis{
 int SensitiveOccurrence;
 string FileName;
 list<string> MonitoredTokens;
 list<string> MissingTokens;
 void reset(void);
};

The analysis object consists of a string object and two list objects. Using pvm_send() or mpi_send() routines to transmit analysis objects between nodes would be very messy. It would also fog up the logic of components C and E. Component C is implemented by the distributeWork() function shown in Listing 3 and component E is implemented by the getAnalysis() function shown in Listing 4.

Listing 3 The distributeWork() function.

 1  void *distributeWork(void *Arg)
 2  {
 3
 4  int N;
 5  string FileName;
 6  int ReturnCode = 0;
 7  int *Value;
 8   Value = static_cast<int *>(Arg);
 9  opvm_stream Destination[NumProcs];
10  for(N = 0;N <TaskId.size();N++)
11  {
12    Destination[N].messageId(1);
13    Destination[N].taskId(TaskId[N]);
14    Destination[N] << WorkLoad;
15  }
16  int CurrentFile;
17  CurrentFile = 0;
18  if(FileQueue.size() > 0){
19   while(FileQueue.size() >= *Value)
20   {
21    for(N = 0; N < WorkSize;N++)
22    {
23     FileName.assign(FileQueue.front());
24     FileQueue.pop_front();
25     if((getFile(FileName)) && (Document.size() > 0)){
26      Destination[N] << FileName;
27       Destination[N] << Document;
28      Document.erase(Document.begin(),Document.end());
29    }
30
31    }
32
33   }
34
35   }
36   pthread_exit(NULL);
37  }

Lines 26 and 27 in Listing 3 show how sending the Document object is simplified by use of overloading the << insertor operator.

Listing 4 The getAnalysis() function.

 1  void *getAnalysis(void *Arg)
 2  {
 3
 4   int N;
 5   int Nth;
 6   ipvm_stream Source;
 7   analysis Result;
 8   for(N =0;N < ListFiles.size();N++)
 9   {
10    for(Nth = 0;Nth < ThreadId.size();Nth++)
11    {
12
13     Source.messageId(1);
14     Source.taskId(ThreadId[Nth]);
15     pthread_mutex_lock(&AnalysisMutex);
16     Source >> Result;
17     AnalysisResult.push_back(Result);
18     pthread_mutex_unlock(&AnalysisMutex);
19
20    }
21
22   }
23   return(NULL);
24  }

Line 16 in Listing 4 is used to receive an analysis object from a worker node. This is another example of the simplification that can take place when interface classes are used properly. It’s important to note that multiple threads have the potential of inserting analysis objects into the AnalysisResult list container at the same time. This makes AnalysisResult one of the critical sections in the multithreaded processing that the utility does. On lines 15 and 18 in Listing 4, we use a mutex to regulate access to that critical section. The distributeWork() function in Listing 3 and the getAnalysis() function in Listing 4 are executed by threads that belong to the manager node. The distributeWork() function gives the worker node the actual work to be done, and the getAnalysis() function receives the analysis results from the worker nodes and places those results in a list<analysis> AnalysisResult container.

  • + Share This
  • 🔖 Save To Your Account