Changeset 429


Ignore:
Timestamp:
03.08.2011 17:44:27 (8 years ago)
Author:
marcus
Message:

inter-thread communication tests with 140 parallel threads

File:
1 edited

Legend:

Unmodified
Added
Removed
  • SHX/trunk/sandbox/pubsubthread.py

    r428 r429  
    99import wx 
    1010from random import randint 
    11 from SeismicHandler.utils.pubsub import pub as msgs 
     11from SeismicHandler.utils.pubsub import pub 
    1212from SeismicHandler.utils.wxAnyThread import anythread 
     13 
     14#threadinput = ['one', 'two', 'three', 'four', 'five', 'six', 'seven'] 
     15#threadinput = map(chr, range(65, 91) + range(97, 123)) 
     16#threadinput = map(chr, range(65, 91)) 
     17threadinput = map(chr, range(97, 103) + range(65, 71)) 
     18threadnames = [] 
     19for i in threadinput: 
     20    for j in threadinput: 
     21        threadnames.append(i+j) 
    1322 
    1423class thready(threading.Thread): 
     
    2130        self.name = "thread.%s" % name 
    2231        self.main = main 
     32        self.stopped = True 
    2333 
    24         msgs.subscribe(self.monitor, "thread.%s" % name) 
     34        pub.subscribe(self.monitor, "thread.%s" % name) 
    2535 
    2636    def run(self): 
     
    3848            # random update of main threads status bar 
    3949            if randint(0, 20) == 10: 
    40                 msgs.sendMessage("ui.change.statustext", text=self.name) 
    41                 msgs.sendMessage(self.name, content="self called") 
     50                pub.sendMessage("ui.change.statustext", text=self.name) 
     51                pub.sendMessage(self.name, content="self called") 
    4252 
    4353            time.sleep(.1) 
    4454 
    4555    def monitor(self, content): 
     56        if self.stopped: 
     57            return 
     58 
    4659        print time.strftime("%H:%M:%S", time.localtime()), self.name, \ 
    4760                                                           "received:", content 
    48         if not self.name.endswith("two"): 
    49             msgs.sendMessage("thread.two", content="test from "+self.name) 
     61 
     62#        return 
     63        call = threadnames[randint(1, len(threadnames))-1] 
     64        if not self.name.endswith(call) and randint(0, 1): 
     65            try: 
     66                pub.sendMessage("thread.%s" % call, 
     67                                         content="via monitor from "+self.name) 
     68            except RuntimeError, e: 
     69                print e 
     70                self.stop() 
    5071 
    5172    def stop(self): 
     73        print time.strftime("%H:%M:%S", time.localtime()), self.name, "got stop" 
    5274        self.stopped = True 
     75        pub.unsubscribe(self.monitor, self.name) 
    5376 
    5477 
     
    6285        self.Bind(wx.EVT_CLOSE, self.OnQuit) 
    6386 
    64         tlist = ['one', 'two', 'three', 'four', 'five', 'six', 'seven'] 
     87        threads = {} 
     88        for name in threadnames: 
     89            threads[name] = thready(name, self) 
     90            print time.strftime("%H:%M:%S"), "start", name 
     91            threads[name].start() 
     92        self.threads = threads 
    6593 
    66         for i, name in enumerate(tlist): 
    67             tlist[i] = thready(name, self) 
    68             print time.strftime("%H:%M:%S"), "start", name 
    69             tlist[i].start() 
    70         self.tlist = tlist 
    71  
    72         msgs.subscribe(self.ListenUpdateStatusText, "ui.change.statustext") 
     94        pub.subscribe(self.ListenUpdateStatusText, "ui.change.statustext") 
    7395 
    7496        self.Show() 
    7597 
    7698    def OnQuit(self, evt): 
    77         for t in self.tlist: 
    78             print time.strftime("%H:%M:%S"), "stopping", t.name 
    79             t.stop() 
    80             t.join() 
     99        for name in self.threads: 
     100            print time.strftime("%H:%M:%S"), "stopping", name 
     101            if self.threads[name].isAlive(): 
     102                self.threads[name].stop() 
     103 
     104        final = False 
     105        threads = self.threads.values() 
     106        tries = 0 
     107        while not final: 
     108            final = True 
     109             
     110            for i, t in enumerate(threads[:]): 
     111                print time.strftime("%H:%M:%S"), "joining", t.name 
     112                if t.isAlive(): 
     113                    t.join(1.1) 
     114                     
     115                    if not t.isAlive(): 
     116                        del threads[i] 
     117                         
     118            if threads and tries < 20: 
     119                print "%u threads left" % len(threads) 
     120                time.sleep(1) 
     121                tries += 1 
     122            else: 
     123                final = True 
     124                 
     125            print tries 
    81126        evt.Skip() 
    82127 
     
    96141 
    97142 
    98 def monitorGlobal(topic=msgs.AUTO_TOPIC, **kwargs): 
    99     print time.strftime("%H:%M:%S", time.localtime()), "monitorGlobal", topic, kwargs 
    100 msgs.subscribe(monitorGlobal, msgs.ALL_TOPICS) 
     143def monitorGlobal(topic=pub.AUTO_TOPIC, **kwargs): 
     144    print time.strftime("%H:%M:%S", time.localtime()), "monitorGlobal", \ 
     145                                                                  topic, kwargs 
     146pub.subscribe(monitorGlobal, pub.ALL_TOPICS) 
    101147 
    102148 
Note: See TracChangeset for help on using the changeset viewer.