Home > Articles > Programming > Python

  • Print
  • + Share This
This chapter is from the book

4.8. Producer-Consumer Problem and the Queue/queue Module

The final example illustrates the producer-consumer scenario in which a producer of goods or services creates goods and places it in a data structure such as a queue. The amount of time between producing goods is non-deterministic, as is the consumer consuming the goods produced by the producer.

37fig01.jpg

We use the Queue module (Python 2.x; renamed to queue in version 3.x) to provide an interthread communication mechanism that allows threads to share data with each other. In particular, we create a queue into which the producer (thread) places new goods and the consumer (thread) consumes them. Table 4-5 itemizes the various attributes that can be found in this module.

Table 4-5. Common Queue/queue Module Attributes

Attribute

Description

Queue/queue Module Classes

 

Queue(maxsize=0)

Creates a FIFO queue of given maxsize where inserts block until there is more room, or (if omitted), unbounded

LifoQueue(maxsize=0)

Creates a LIFO queue of given maxsize where inserts block until there is more room, or (if omitted), unbounded

PriorityQueue(maxsize=0)

Creates a priority queue of given maxsize where inserts block until there is more room, or (if omitted), unbounded

Queue/queue Exceptions

 

Empty

Raised when a get*() method called for an empty queue

Full

Raised when a put*() method called for a full queue

Queue/queue Object Methods

 

qsize()

Returns queue size (approximate, whereas queue may be getting updated by other threads)

empty()

Returns True if queue empty, False otherwise

full()

Returns True if queue full, False otherwise

put(item, block=True, timeout=None)

Puts item in queue; if block True (the default) and timeout is None, blocks until room is available; if timeout is positive, blocks at most timeout seconds or if block False, raises the Empty exception

put_nowait(item)

Same as put(item, False)

get(block=True, timeout=None)

Gets item from queue, if block given (not 0), block until an item is available

get_nowait()

Same as get(False)

task_done()

Used to indicate work on an enqueued item completed, used with join() below

join()

Blocks until all items in queue have been processed and signaled by a call to task_done() above

We’ll use Example 4-12 (prodcons.py), to demonstrate producer-consumer Queue/queue. The following is the output from one execution of this script:

$ prodcons.py
starting writer at: Sun Jun 18 20:27:07 2006
producing object for Q... size now 1
starting reader at: Sun Jun 18 20:27:07 2006
consumed object from Q... size now 0
producing object for Q... size now 1
consumed object from Q... size now 0
producing object for Q... size now 1
producing object for Q... size now 2
producing object for Q... size now 3
consumed object from Q... size now 2
consumed object from Q... size now 1
writer finished at: Sun Jun 18 20:27:17 2006
consumed object from Q... size now 0
reader finished at: Sun Jun 18 20:27:25 2006
all DONE

Example 4-12. Producer-Consumer Problem (prodcons.py)

This implementation of the Producer–Consumer problem uses Queue objects and a random number of goods produced (and consumed). The producer and consumer are individually—and concurrently—executing threads.

1    #!/usr/bin/env python
2
3    from random import randint
4    from time import sleep
5    from Queue import Queue
6    from myThread import MyThread
7
8    def writeQ(queue):
9        print 'producing object for Q...',
10       queue.put('xxx', 1)
11       print "size now", queue.qsize()
12
13   def readQ(queue):
14       val = queue.get(1)
15        print 'consumed object from Q... size now', 16               queue.qsize()
17
18   def writer(queue, loops):
19       for i in range(loops):
20           writeQ(queue)
21           sleep(randint(1, 3))
22
23   def reader(queue, loops):
24       for i in range(loops):
25           readQ(queue)
26           sleep(randint(2, 5))
27
28   funcs = [writer, reader]
29   nfuncs = range(len(funcs))
30
31   def main():
32       nloops = randint(2, 5)
33       q = Queue(32)
34
35       threads = []
36       for i in nfuncs:
37           t = MyThread(funcs[i], (q, nloops),
38               funcs[i].__name__)
39           threads.append(t)
40
41       for i in nfuncs:
42           threads[i].start()
43
44       for i in nfuncs:
45           threads[i].join()
46
47       print 'all DONE'
48
49   if __name__ == '__main__':
50       main()

As you can see, the producer and consumer do not necessarily alternate in execution. (Thank goodness for random numbers!) Seriously, though, real life is generally random and non-deterministic.

Line-by-Line Explanation

Lines 1–6

In this module, we use the Queue.Queue object as well as our thread class myThread.MyThread, seen earlier. We use random.randint() to make production and consumption somewhat varied. (Note that random.randint() works just like random.randrange() but is inclusive of the upper/end value).

Lines 8–16

The writeQ() and readQ() functions each have a specific purpose: to place an object in the queue—we are using the string 'xxx', for example—and to consume a queued object, respectively. Notice that we are producing one object and reading one object each time.

Lines 18–26

The writer() is going to run as a single thread whose sole purpose is to produce an item for the queue, wait for a bit, and then do it again, up to the specified number of times, chosen randomly per script execution. The reader() will do likewise, with the exception of consuming an item, of course.

You will notice that the random number of seconds that the writer sleeps is in general shorter than the amount of time the reader sleeps. This is to discourage the reader from trying to take items from an empty queue. By giving the writer a shorter time period of waiting, it is more likely that there will already be an object for the reader to consume by the time their turn rolls around again.

Lines 28–29

These are just setup lines to set the total number of threads that are to be spawned and executed.

Lines 31–47

Finally, we have our main() function, which should look quite similar to the main() in all of the other scripts in this chapter. We create the appropriate threads and send them on their way, finishing up when both threads have concluded execution.

We infer from this example that a program that has multiple tasks to perform can be organized to use separate threads for each of the tasks. This can result in a much cleaner program design than a single-threaded program that attempts to do all of the tasks.

In this chapter, we illustrated how a single-threaded process can limit an application’s performance. In particular, programs with independent, non-deterministic, and non-causal tasks that execute sequentially can be improved by division into separate tasks executed by individual threads. Not all applications will benefit from multithreading due to overhead and the fact that the Python interpreter is a single-threaded application, but now you are more cognizant of Python’s threading capabilities and can use this tool to your advantage when appropriate.

  • + Share This
  • 🔖 Save To Your Account