Home > Articles

  • Print
  • + Share This
This chapter is from the book

4.11 Dividing Your Program into Multiple Threads

Earlier in this chapter we discussed the delegation of work according to a specific strategy or approach called a thread model. Those thread models were:

  • delegation (boss–worker)

  • peer-to-peer

  • pipeline

  • producer–consumer

Each model has its own WBS (Work Breakdown Structure) that determines who is responsible for thread creation and under what conditions threads are created. In this section we will show an example of a program for each model using Pthread library functions.

4.11.1 Using the Delegation Model

We discussed two approaches that can be used to implement the delegation approach to dividing a program into threads. To recall, in the delegation model, a single thread (boss) creates the threads (workers) and assigns each a task. The boss thread delegates the task each worker thread is to perform by specifying a function. With one approach, the boss thread creates threads as a result of requests made to the system. The boss thread processes each type of request in an event loop. As events occur, thread workers are created and assigned their duties. Example 4.5 shows the event loop in the boss thread and the worker threads in pseudocode.

Example 4.5 Approach 1: Skeleton program of boss and worker thread model.

//...
pthread_mutex_t Mutex = PTHREAD_MUTEX_INITIALIZER
int AvailableThreads
pthread_t Thread[Max_Threads]
void decrementThreadAvailability(void)
void incrementThreadAvailability(void)
int threadAvailability(void);


// boss thread
{
   //...
   if(sysconf(_SC_THREAD_THREADS_MAX) > 0){
      AvailableThreads = sysconf(_SC_THREAD_THREADS_MAX)
   }
   else{
          AvailableThreads = Default
   }

   int Count = 1;

   loop while(Request Queue is not empty)
      if(threadAvailability()){
         Count++
         decrementThreadAvailability()
         classify request
         switch(request type)
         {
            case X : pthread_create(&(Thread[Count])...taskX...)
            case Y : pthread_create(&(Thread[Count])...taskY...)
            case Z : pthread_create(&(Thread[Count])...taskZ...)
            //...
         }
      }
      else{
              //free up thread resources
      }
   end loop
}

void *taskX(void *X)
{
   // process X type request
   incrementThreadAvailability()
   return(NULL)
}

void *taskY(void *Y)
{
   // process Y type request
   incrementThreadAvailability()
   return(NULL)
}

void *taskZ(void *Z)
{
   // process Z type request
   decrementThreadAvailability()
   return(NULL)
}

//...

In Example 4.5, the boss thread dynamically creates a thread to process each new request that enters the system, but there are a maximum number of threads that will be created. There are n number of tasks to process n request types. To be sure the maximum number of threads per process will not be exceeded, these additional functions can be defined:

threadAvailability()
incrementThreadAvailability()
decrementThreadAvailability()

Example 4.6 shows pseudocode for these functions.

Example 4.6 Functions that manage thread availability count.

void incrementThreadAvailability(void)
{
   //...
   pthread_mutex_lock(&Mutex)
   AvailableThreads++
   pthread_mutex_unlock(&Mutex)
}

void decrementThreadAvailability(void)
{
   //...
   pthread_mutex_lock(&Mutex)
   AvailableThreads—
   pthread_mutex_unlock(&Mutex)
}

int threadAvailability(void)
{
   //...
   pthread_mutex_lock(&Mutex)
   if(AvailableThreads > 1)
      return 1
   else
      return 0
   pthread_mutex_unlock(&Mutex)
}

The threadAvailability() function will return 1 if the maximum number of threads allowed per process has not been reached. This function accesses a global variable ThreadAvailability that stores the number of threads still available for the process. The boss thread calls the decrementThreadAvailability() function, which decrements the global variable before the boss thread creates a thread. The worker threads call incrementThreadAvailability(), which increments the global variable before a worker thread exits. Both functions contain a call to pthread_mutex_lock() before accessing the variable and a call to pthread_mutex_unlock() after accessing the global variable. If the maximum number of threads are exceeded, then the boss thread can cancel threads if possible or spawn another process, if necessary. taskX(), taskY(), and taskZ() execute code that processes their type of request.

The other approach to the delegation model is to have the boss thread create a pool of threads that are reassigned new requests instead of creating a new thread per request. The boss thread creates a number of threads during initialization and then each thread is suspended until a request is added to the queue. The boss thread will still contain an event loop to extract requests from the queue. But instead of creating a new thread per request, the boss thread signals the appropriate thread to process the request. Example 4.7 shows the boss thread and the worker threads in pseudocode for this approach to the delegation model.

Example 4.7 Approach 2: Skeleton program of boss and worker thread model.

//...

pthread_t Thread[N]

// boss thread
{

    pthread_create(&(Thread[1]...taskX...);
    pthread_create(&(Thread[2]...taskY...);
    pthread_create(&(Thread[3]...taskZ...);
    //...

    loop while(Request Queue is not empty
       get request
       classify request
       switch(request type)
       {
           case X :
                    enqueue request to XQueue
                    signal Thread[1]

           case Y :
                    enqueue request to YQueue
                    signal Thread[2]

           case Z :
                    enqueue request to ZQueue
                    signal Thread[3]
           //...
       }

   end loop
}

void *taskX(void *X)
{
   loop
       suspend until awaken by boss
       loop while XQueue is not empty
          dequeue request
          process request

       end loop
   until done
{

void *taskY(void *Y)
{
   loop
       suspend until awaken by boss
       loop while YQueue is not empty
          dequeue request
          process request
       end loop
   until done
}

void *taskZ(void *Z)
{
   loop
       suspend until awaken by boss
       loop while (ZQueue is not empty)
          dequeue request
          process request
       end loop
   until done
}

//...

In Example 4.7, the boss thread creates N number of threads, one thread for each task to be executed. Each task is associated with processing a request type. In the event loop, the boss thread dequeues a request from the request queue, determines the request type, enqueues the request to the appropriate request queue, then signals the thread that processes the request in that queue. The functions also contain an event loop. The thread is suspended until it receives a signal from the boss that there is a request in its queue. Once awakened, in the inner loop, the thread processes all the requests in the queue until it is empty.

4.11.2 Using the Peer-to-Peer Model

In the peer-to-peer model, a single thread initially creates all the threads needed to perform all the tasks called peers. The peer threads process requests from their own input stream. Example 4.8. shows a skeleton program of the peer-to-peer approach of dividing a program into threads.

Example 4.8 Skeleton program using the peer-to-peer model

//...

pthread_t Thread[N]

// initial thread
{

    pthread_create(&(Thread[1]...taskX...);
    pthread_create(&(Thread[2]...taskY...);
    pthread_create(&(Thread[3]...taskZ...);
    //...

  }

void *taskX(void *X)
{
    loop while (Type XRequests are available)
          extract Request
          process request
    end loop
    return(NULL)
}

//...

In the peer-to-peer model, each thread is responsible for its own stream of input. The input can be extracted from a database, file, and so on.

4.11.3 Using the Pipeline Model

In the pipeline model, there is a stream of input processed in stages. At each stage, work is performed on a unit of input by a thread. The input continues to move to each stage until the input has completed processing. This approach allows multiple inputs to be processed simultaneously. Each thread is responsible for producing its interim results or output, making them available to the next stage or next thread in the pipeline. Example 4.9 shows the skeleton program for the pipeline model.

Example 4.9 Skeleton program using the pipeline model.

//...

   pthread_t Thread[N]
   Queues[N]

   // initial thread
   {
       place all input into stage1's queue
       pthread_create(&(Thread[1]...stage1...);
       pthread_create(&(Thread[2]...stage2...);
       pthread_create(&(Thread[3]...stage3...);
       //...
    }

void *stageX(void *X)
{
   loop
     suspend until input unit is in queue
     loop while XQueue is not empty
         dequeue input unit
         process input unit
         enqueue input unit into next stage's queue
      end loop
   until done
   return(NULL)
}

//...

In Example 4.9, N queues are declared for N stages. The initial thread enqueues all the input into stage 1's queue. The initial thread then creates all the threads needed to execute each stage. Each stage has an event loop. The thread sleeps until an input unit has been enqueued. The inner loop continues to iterate until its queue is empty. The input unit is dequeued, processed, then that unit is then enqueued into the queue of the next stage.

4.11.4 Using the Producer–Consumer Model

In the producer-consumer model, the producer thread produces data consumed by the consumer thread or threads. The data is stored in a block memory shared between the producer and consumer threads. This model was used in Programs 4.5, 4.6, and 4.7. Example 4.10 shows the skeleton program for the producer-consumer model.

Example 4.10 Skeleton program using the producer–consumer model.

pthread_mutex_t Mutex = PTHREAD_MUTEX_INITIALIZER
pthread_t Thread[2]
Queue

// initial thread
{
    pthread_create(&(Thread[1]...producer...);
    pthread_create(&(Thread[2]...consumer...);
    //...
 }

void *producer(void *X)
{
   loop
      perform work
        pthread_mutex_lock(&Mutex)
         enqueue data
      pthread_mutex_unlock(&Mutex)
         signal consumer
      //...
   until done
}

void *consumer(void *X)
{
   loop
      suspend until signaled
      loop while(Data Queue not empty)
          pthread_mutex_lock(&Mutex)
           dequeue data
       pthread_mutex_unlock(&Mutex)
          perform work
      end loop
   until done
}

In Example 4.9, an initial thread creates the producer and consumer threads. The producer thread executes a loop in which it performs work then locks a mutex on the shared queue in order to enqueue the data it has produced. The producer unlocks the mutex then signals the consumer thread that there is data in the queue. The producer iterates through the loop until all work is done. The consumer thread also executes a loop in which it suspends itself until it is signaled. In the inner loop, the consumer thread processes all the data until the queue is empty. It locks the mutex on the shared queue before it dequeues any data and unlocks the mutex after the data has been dequeued. It then performs work on that data. In Program 4.6, the consumer thread enqueues its results to a file. The results could have been inserted into another data structure. This is often done by consumer threads in which it plays both the role of consumer and producer. It plays the role of consumer of the unprocessed data produced by the producer thread, then it plays the role of producer when it processes data stored in another shared queue consumed by another thread.

4.11.5 Creating Multithreaded Objects

The delegation, peer-to-peer, pipeline. and producer–consumer models demonstrate approaches to dividing a program into multiple threads along function lines. When using objects, member functions can create threads to perform multiple tasks. Threads can be used to execute code on behalf of the object: free-floating functions and other member functions.

In either case, the threads are declared within the object and created by one of the member functions (e.g., the constructor). The threads can then execute some free-floating functions (function defined outside the object), which invokes member functions of the object that are global. This is one approach to making an object multithreaded. Example 4.10 contains an example of a multithreaded object.

Example 4.10 Declaration and definition of multithreading an object.

#include <pthread.h>
#include <iostream>
#include <unistd.h>

void *task1(void *);
void *task2(void *);

class multithreaded_object
{
   pthread_t Thread1,Thread2;
public:

   multithreaded_object(void);
   int c1(void);
   int c2(void);
   //...
};

multithreaded_object::multithreaded_object(void)
{

   //...
   pthread_create(&Thread1,NULL,task1,NULL);
   pthread_create(&Thread2,NULL,task2,NULL);
   pthread_join(Thread1,NULL);
   pthread_join(Thread2,NULL);
   //...

}

int multithreaded_object::c1(void)
{
   // do work
   return(1);
}

int multithreaded_object::c2(void)
{
   // do work
return(1);
}

multithreaded_object MObj;

void *task1(void *)
{
   //...
   MObj.c1();
   return(NULL);
}

void *task2(void *)
{
   //...
   MObj.c2();
   return(NULL);
}

In Example 4.10, the class multithread_object declares two threads. From the constructor of the class, the threads are created and joined. Thread1 executes task1 and Thread2 executes task2. task1 and task2, then invokes member functions of the global object MObj.

  • + Share This
  • 🔖 Save To Your Account