Home > Articles > Open Source > Python

  • Print
  • + Share This
Like this article? We recommend

Code Galore

Of course, before the Stackless fun starts, we have to write the actual agent code. This is kept as simple as possible, especially for the first version. That is to say, I had an agent that did use all Aurea statistics, but I have simplified it for the purpose of this article.

Testing Comes First

It's an excellent custom to start writing tests, even before you start to write actual code. With Python 2.1, you can use the PyUnit unit-testing framework. (You can also get this separately.) By taking a look at the test code, we'll know what our agents are supposed to be doing: growing old, finding a partner, getting pregnant, partaking of the earth's valuable bounty. For all these aspects of an agent's life, a small test function is written. Note how we use a dummy class, FakeWorld, to fully test the agent. See worldtest.py:

#!/bin/env python
"""
 ********************************************************************
          worldtest.py - unit tests for world.py
               -------------------
  begin        : Fri May 25 09:02:12 CEST 2001
  copyright      : (C) 2001 by Boudewijn Rempt
  email        : boud@rempt.xs4all.nl
 ********************************************************************
 ********************************************************************
 *
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License as
published by the Free Software Foundation; either version 2 of the
License, or (at your option) any later version.
*
 ********************************************************************
"""
import sys, unittest, uthread, time
from world import *

class FakeWorld:
  def __init__(self):
    self._bounty=1
    self.stopped=FALSE

  def append(self, child):
    pass

  def setBounty(self, bounty):
    self._bounty=bounty

  def bounty(self):
    return self._bounty

  def remove(self, agent):
    pass

  def debute(self, agent):
    pass

The implementation of FakeWorld depends on the implementation of Agent as demanded by the class AgentTestCase—a world must provide for functions to set the resources available to an agent, and to add children to its pool of running agents:

class AgentTestCase(unittest.TestCase):

  def setUp(self):
    self.world=FakeWorld()

Creating a world—even a fake one for testing—might be a lengthy process, and it would be nice to reuse the initialization code for all tests. The setup() function creates the fixtures that every test can use.

  def testInstantiation(self):
    a=Agent("testAgent", self.world)
    assert a != None, "Agent could not be instantiated"

  def testAgeOneYear(self):
    a=Agent("testAgent", self.world)
    age=a.age()
    a.ageOneYear()
    assert (age + 1) == a.age(), 'Agent could not age one year'

  def testGetPartner(self):
    a=Agent("Adam", self.world)
    b=Agent("Eve", self.world)
    accepted = a.propose(b)
    assert a.married()==accepted, 'Invalid state: married %i,
  accepted %i' % (a.married(), accepted)

  def testGetPregnant(self):
    a=Agent("Adam", self.world)
    b=Agent("Eve", self.world)

    while a.age() < 16:
      a.ageOneYear()
      b.ageOneYear()

    assert b.age() > 15, "Eve is not old enough"
    assert a.age() > 15, "Adam is not old enough"

    a.setSex(MALE)
    a.setSexualPreference(10) # 100% hetero
    b.setSex(FEMALE)
    b.setSexualPreference(10) # ditto
    a.marry(b)
    b.ageOneYear() # in one year you can get pregnant.
    assert len(b.children()) > 0, 'a did not impregnate b'

  def testProduction(self):
    a=Agent("testAgent", self.world)
    while a.isAlive():
      a.produceAndFeed()
      a.ageOneYear()
    if a.age() > MAX_LIFESPAN:
      assert a.riches() > 0,
   "Agent died of natural causes but should have starved"

    self.world.setBounty(-1)
    a=Agent("testAgent", self.world)
    while a.isAlive():
      a.produceAndFeed()
      a.ageOneYear()
    assert a.age() < MAX_LIFESPAN, "Agent should have starved"

  def testLive(self):
    a=Agent("Adam", self.world)
    thread=uthread.new(a.live)
    uthread.run()
    while (  len(uthread.microThreadsRunning()
        and a.age() < MAX_LIFESPAN):
      assert a.age() < MAX_LIFESPAN,
   "Agent should have quit running."

All tests have the prefix test, to facilitate the automatic generation of test suites. You can see that the tests are very simple: One or two agents are created, and after calling a method on the agent, the result is asserted to be what we expected.

The last test, testLive, will no doubt pique your curiosity. It's the first time we actually make use of the microthreads module, uthread.

The first version of this test simply called a.live()—however, during the implementation of the live() function, it became apparent that it would have to use some microthreading functionality. To enable that, it was necessary to actually start the live() as intended, from a microthread.

Doing so posed an interesting problem. Creating a new thread and asking the scheduler to run it is easy. However, after having started the thread, the run() immediately returns—finishing the test if we wouldn't wait for the thread to finish. This means that any exception thrown by the live() won't get caught by the testing framework. One solution is to wait until all threads have finished running. This can be tricky under more complex circumstances. For instance, when the environment itself starts some threads, the thread count cannot be relied upon. The following solution is neater:

def live(self, agent):
  agent.live()
  self.finished=TRUE

def testLive(self):
  self.finished=FALSE
  a=Agent("Adam", self.world)
  agentthread=uthread.new(self.live, a)
  uthread.run()
  while self.finished==FALSE: pass
  assert a.age() <= MAX_LIFESPAN, "Agent should have died
  earlier: %i" % a.age()

You can't raise assertion errors or fail() tests from within a microthread—they don't propagate to the main thread. The above solution works with a shared variable that tells the test function when the thread has finished.

Another solution would be to use a lock object that's acquired by the live() function and the testlive() function. This is even neater. Locks are the bread-and-butter objects of thread programming, and they don't work differently from the lock object in the regular Python threading module:

def live(self, agent, lck):
  lck.acquire()
  agent.live()
  lck.release()

def testLive(self):
  a=Agent("Adam", self.world)
  lck=uthread.Lock()
  t=uthread.new(self.live, a, lck)
  uthread.run()
  lck.acquire()

The final bit of testing code creates a test suite and starts the test runner:

def suite():
  suite = unittest.TestSuite()
  suite.addTest(unittest.makeSuite(AgentTestCase))
  return suite

if __name__=="__main__":
  runner = unittest.TextTestRunner()
  runner.run(suite())

You can also run the tests using a graphical test-runner program, but here the test suite runs in the console window. Creating test suites is an excellent way of defining the requirements of your software and of assuring that the software keeps doing what it was supposed to do.

Building the unit tests first has already shown us a lot about what we should implement, and incidentally has given us a gentle introduction to microthreads. Now it's time for the real implementation.

Implementing the Agent

As shown in the test case, an agent must be able to produce, feed, find a partner, marry, and get children. With these requirements I built the following Agent class:

class Agent:
  def __init__(self, identifier, world, *args):
    self._identifier=identifier
    self._maxAge=atomicRandRange(1,MAX_LIFESPAN)
    self._age=0
    self._sexualPreference=atomicRandRange(1,11)
    self._riches=atomicTimesD100()
    self._capacity=atomicTimesD4(3)
    self._need=atomicTimesD4(3)
    self._world=world
    self._mother=None
    self._father=None
    self._partner=None
    self._maxChildren=atomicRandRange(1, self._capacity)
    self._childCount=0
    self._children=[]
    self._ripeAge=atomicRandRange(14,20)
    self._sex=atomicRandRange(0,2)

There are a number of statistics that are determined randomly. To use the random generator, it has to be wrapped in the atomic(). This is because the Python random module uses some state variables that should not be changed by two threads at the same time. The moment an atomic() function is executed, only that function executes. Normally, a thread can switch context after each statement in a function; when called with atomic(), there will be no context switch until the whole function returns.

All random functions can be found the file dice.py, which is tested in the file dicetest.py. Here's an example:

def atomicRandRange(start, end, step=1):
  import random
  return uthread.atomic(random.randrange, start, end, step)

After this excursion, let's continue with the Agent code. There are a number of not-so-interesting functions that handle access to private variables and so on. Let's skip these; you can find them in the full source, in the file world.py.

More interesting are the following functions, which handle the actual life of an agent:

...

  def isAlive(self):
    if self._age > self._maxAge:
      return FALSE
    if self._capacity <= 0:
      return FALSE
    return TRUE

Currently, you die when your time has come, or when you don't have any strength left, because of malnutrition. I haven't simulated diseases!

Getting pregnant is a simple administrative issue—although a partner is necessary, and although he lends his identity to the child, he doesn't have to take an active role. Note that when the child has been born, the world object is being called to add the child to its list of inhabitants: the world is responsible for actually giving the child a thread to live in.

This is actually pretty important. If all you have are a few hundred threads, you can't give every agent its own thread. That means that you'll have to base your system on messages between objects, not on direct function calls. Messages can be queued and handled asynchronously—function calls are executed immediately. But it's a lot easier to just use function calls:

...
  def getPregnant(self):
    child=Agent(self.identity() + "." +
          self._partner.identity() +
          "(" + str(self._childCount + 1) + ")" ,
          self._world)
    child._mother=self
    child._father=self._partner
    self._world.append(child)
    self._children.append(child)
    self._childCount += 1

Let's skip a number of other functions, to arrive at the central loop of every agent: the live() function. This is what's run by the agent thread:

  def live(self):

    while self.isAlive() and self._world.stopped == FALSE:
      # It should take one year realtime to live
      timer = uthread.Timer(ONE_YEAR)
      timer.start()
      self.produceAndFeed()

      if self.age() == self._ripeAge:
        self._world.debute(self)

      if ( self.married()
         and self.sex()
         and self.age() < 40
         and self._childCount <= self._maxChildren):
        self.getPregnant()

      if (self._age > 14
        and self._riches > 10
        and self._partner == None
        and self.sex() == MALE):
        self._world.requestPartner(self)
      timer.wait() # wait until the year is over
      self.ageOneYear()
    print "%s died at age %i" % (self.identity(), self.age())
    print self
    self._world.remove(self)

One problem with simulations of this kind is the notion of time. Ideally, you'd want to have the simulation happen in sped-up real-time. To achieve this, I've defined a constant that represents how long (in seconds) a simulated year takes. The agent is free to do what she wants, but if she's finished, she has to wait until the next year before starting again. This is achieved with a microthread timer. The timer is set to one second and started. If the timer hasn't run out of time, many actions—and possibly context switches—later, the thread is blocked until the year is over.

Note that when the agent doesn't receive enough timeslices in a simulated year to complete all his actions, he'll start to lag. In a real simulation, this problem would have to be solved—but that's certainly not trivial.

Creating the World

Of course, there has to be a place for the agents to live. This is a fairly complicated affair. The World class keeps a registry of all agents, starts their threads, provides natural resources ("bounty") and performs basic matchmaking services.

I'll pick the highlights from the actual code for you; the full code is in the file world.py:

class World(QObject):

  def __init__(self, *args):
    apply(QObject.__init__,(self,) + args)
    self.stopped=FALSE
    self._agentRegistry=[]
    self._bounty=INITIAL_NUMBER_OF_AGENTS * INITIAL_RESOURCES
    self._born=0
    self._died=0
    self.partnerQueue=uthread.Queue()
    self._scheduler=uthread.getScheduler()

Some initialization work is done, but the agents are not yet created. I've made the world a descendant of a PyQt QObject. I've used the PyQt GUI toolkit to make a nice GUI for the simulation. By making the world a QObject, I can send asynchronous messages from the simulation model to the GUI. I'm a real fan of Phil Thompson's work, and I hope to finish a book on PyQt for Opendocs this summer. PyQt is available for Windows and Linux.

...
  def init(self):
    for i in range(INITIAL_NUMBER_OF_AGENTS):
      self._agentRegistry.append(Agent(str(uthread.uniqueId()), 
                    self))
    self.emit(PYSIGNAL("sigMessage"), (str(time.time()) +
                      " Agents created",))

The agent registry is seeded with a certain number of agents. I'm afraid that with the current simulation constraints, starting out with just Adam and Eve is not sufficient to populate the world! The chance of both dying before having attained maturity is far too high.

  def go(self):
    self._scheduler=uthread.getScheduler()
    for agent in self._agentRegistry:
      thread = uthread.newresistant(agent.live)
    self.emit(PYSIGNAL("sigMessage"), (str(time.time()) +
                      " Threads created",))
    uthread.run()

In the go() function, threads are created for all agents. These are exception-resistant threads: When an exception happens in another thread, these threads continue running. The function run() then starts all threads. There is also a function runAndContinue, which should start the threads and return immediately. However, it doesn't work; all threads receive one timeslice, and then stop. This is a bug... Note that I set the instance variable self._scheduler before starting the threads. This variable is later used to determine in which microthread scheduler the agents are running.

  def stop(self):
    self.stopped=TRUE

It's important to note that you cannot stop or kill threads. Not in the regular Python threading module, nor in the microthread module. The canonical way to stop threads is to check in the thread for the state of some global variable. See the function Agent.live(), where the while loop will stop if world.stopped is set to TRUE.

  def append(self, child):
    self.emit(PYSIGNAL("sigMessage"),
         (str(time.time()) + " Agent born " + 
          child.identity(),))
    self._born += 1
    self._agentRegistry.append(child)
    t=uthread.newresistant(child.live)

When a child is born, the mother calls this append() function. The world then adds the child to its registry, and also creates a new exception-resistant thread for the child. You don't have to call run() again: the thread will automatically become part of the current scheduler and will start receiving timeslices.

  def debute(self, agent):
    self.emit(PYSIGNAL("sigMessage"),
         (str(time.time()) + " Agent %s comes of age." %
          agent.identity(),))
    self.partnerQueue.put(agent)

  def requestPartner(self, agent):
    self.emit(PYSIGNAL("sigMessage"),
         (str(time.time()) + " Agent %s requests partner" %
          agent.identity(),) )
    partner=self.partnerQueue.get()

    if partner==agent:
      self.partnerQueue.put(partner)
      partner=self.partnerQueue.get()

    if not agent.propose(partner):
      self.emit(PYSIGNAL("sigMessage"),
         (str(time.time()) + " Agent %s refuses %s" %
          (agent.identity(),partner.identity()),) )
      self.partnerQueue.put(partner)
      return
    else:
      self.emit(PYSIGNAL("sigMessage"),
           ("%s: %s marries %s" %
            (str(time.time()),
             agent.identity(),
             partner.identity()),))

The way agents find each other is quite interesting, if a bit old-fashioned. It makes use of a microthread Queue—a first-in first-out list. As soon as a girl reaches her marriageable age, she debuts and is put in the partnerQueue. When the men are old enough to marry, they start looking for a partner. The world de-queues one girl; no other man can reach her. If they happen to like() each other, very soon a child will be born. If not, the girl is returned to the bottom of the queue.

Of course, this is merely a model; if you want a more realistic model, you might want to implement two queues, to give both sexes a chance to actively find a partner. Note that the agent proposes; but it's the "proposee" who decides whether she likes him.

Adding a GUI

Creating a GUI for a multithreaded application is an interesting challenge (see Figure 1). Unless the GUI library supports threading really well (and few do), you can't access the GUI from the threads themselves. Thus, an agent or the world cannot directly set texts in GUI text fields, for example. The GUI will have to take a more active role.

Figure 1 A PyQt GUI for a microthreaded application. (Toolbar icons by the KDE project.)

The solution is to use a GUI timer object that periodically takes a look at the world statistics. PyQt has another mechanism, too, where an object can send "signals" containing a bit of data to "slots." I use this signal/slot technique for the messages, but the timer for the display of statistics:

  def slotMessage(self, message):
    self.view.txtMessages.insertLine(message)
    self.view.txtMessages.
       setCursorPosition(self.view.txtMessages.numLines(),
                0)

  def monitor(self):
    timer=QTimer(self, "monitor timer")
    self.connect(timer,
           SIGNAL("timeout()"),
           self.slotUpdateStatistics)
    timer.start(1000, FALSE)

The slotUpdateStatistics() function asks the world about the number of threads running. To access this information, it's necessary to get at the right scheduler. Every Python thread contains its own microthreading scheduler. When the world starts to run, the correct scheduler is saved in an instance variable for easy reference.

  def slotUpdateStatistics(self):
    scheduler=self.world.scheduler()
    self.view.txtThreads.
       setText(str(len(scheduler.getAllThreads())))

When the number of threads has dropped to zero, the population has died out...

  def slotGo(self):
    self.statusBar().message("Starting up...")
    self.worldThread =
       threading.Thread(None, self.world.go, "worldThread")
    self.worldThread.start()
    self.statusBar().message("Running")

  def slotStop(self):
    self.statusBar().message("Stopping...")
    self.world.stop()
    self.statusBar().message("Stopped")

  def slotQuit(self):
    self.world.stop() # stop the world
    qApp.quit()    # stop the gui
    raise SystemExit  # stop all remaining macro-threads

Starting and stopping threads is a challenge in itself. We want the GUI to stay responsive, and, if possible, not crash because another thread accesses a GUI object. Therefore, the whole world, microthreads and all, is started in a second Python thread. There's always one thread running in Python, the Main thread—that's where the GUI runs.

Stopping the world is not so difficult; we use the stop() function shown above. Quitting the application without segfaults is more difficult. First, the world is stopped. Then the GUI application is quit. But because we've been fooling around with threads, some might still be running. Not closing them leads to all kinds of interesting crashes. I've even managed to kill my X server today! So, by raising the SystemExit exception, every Python thread and every microthread is told to stop immediately.

  • + Share This
  • 🔖 Save To Your Account