Changeset 429
- Timestamp:
- 08/03/2011 05:44:27 PM (12 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
SHX/trunk/sandbox/pubsubthread.py
r428 r429 9 9 import wx 10 10 from random import randint 11 from SeismicHandler.utils.pubsub import pub as msgs11 from SeismicHandler.utils.pubsub import pub 12 12 from SeismicHandler.utils.wxAnyThread import anythread 13 14 #threadinput = ['one', 'two', 'three', 'four', 'five', 'six', 'seven'] 15 #threadinput = map(chr, range(65, 91) + range(97, 123)) 16 #threadinput = map(chr, range(65, 91)) 17 threadinput = map(chr, range(97, 103) + range(65, 71)) 18 threadnames = [] 19 for i in threadinput: 20 for j in threadinput: 21 threadnames.append(i+j) 13 22 14 23 class thready(threading.Thread): … … 21 30 self.name = "thread.%s" % name 22 31 self.main = main 32 self.stopped = True 23 33 24 msgs.subscribe(self.monitor, "thread.%s" % name)34 pub.subscribe(self.monitor, "thread.%s" % name) 25 35 26 36 def run(self): … … 38 48 # random update of main threads status bar 39 49 if randint(0, 20) == 10: 40 msgs.sendMessage("ui.change.statustext", text=self.name)41 msgs.sendMessage(self.name, content="self called")50 pub.sendMessage("ui.change.statustext", text=self.name) 51 pub.sendMessage(self.name, content="self called") 42 52 43 53 time.sleep(.1) 44 54 45 55 def monitor(self, content): 56 if self.stopped: 57 return 58 46 59 print time.strftime("%H:%M:%S", time.localtime()), self.name, \ 47 60 "received:", content 48 if not self.name.endswith("two"): 49 msgs.sendMessage("thread.two", content="test from "+self.name) 61 62 # return 63 call = threadnames[randint(1, len(threadnames))-1] 64 if not self.name.endswith(call) and randint(0, 1): 65 try: 66 pub.sendMessage("thread.%s" % call, 67 content="via monitor from "+self.name) 68 except RuntimeError, e: 69 print e 70 self.stop() 50 71 51 72 def stop(self): 73 print time.strftime("%H:%M:%S", time.localtime()), self.name, "got stop" 52 74 self.stopped = True 75 pub.unsubscribe(self.monitor, self.name) 53 76 54 77 … … 62 85 self.Bind(wx.EVT_CLOSE, self.OnQuit) 63 86 64 tlist = ['one', 'two', 'three', 'four', 'five', 'six', 'seven'] 87 threads = {} 88 for name in threadnames: 89 threads[name] = thready(name, self) 90 print time.strftime("%H:%M:%S"), "start", name 91 threads[name].start() 92 self.threads = threads 65 93 66 for i, name in enumerate(tlist): 67 tlist[i] = thready(name, self) 68 print time.strftime("%H:%M:%S"), "start", name 69 tlist[i].start() 70 self.tlist = tlist 71 72 msgs.subscribe(self.ListenUpdateStatusText, "ui.change.statustext") 94 pub.subscribe(self.ListenUpdateStatusText, "ui.change.statustext") 73 95 74 96 self.Show() 75 97 76 98 def OnQuit(self, evt): 77 for t in self.tlist: 78 print time.strftime("%H:%M:%S"), "stopping", t.name 79 t.stop() 80 t.join() 99 for name in self.threads: 100 print time.strftime("%H:%M:%S"), "stopping", name 101 if self.threads[name].isAlive(): 102 self.threads[name].stop() 103 104 final = False 105 threads = self.threads.values() 106 tries = 0 107 while not final: 108 final = True 109 110 for i, t in enumerate(threads[:]): 111 print time.strftime("%H:%M:%S"), "joining", t.name 112 if t.isAlive(): 113 t.join(1.1) 114 115 if not t.isAlive(): 116 del threads[i] 117 118 if threads and tries < 20: 119 print "%u threads left" % len(threads) 120 time.sleep(1) 121 tries += 1 122 else: 123 final = True 124 125 print tries 81 126 evt.Skip() 82 127 … … 96 141 97 142 98 def monitorGlobal(topic=msgs.AUTO_TOPIC, **kwargs): 99 print time.strftime("%H:%M:%S", time.localtime()), "monitorGlobal", topic, kwargs 100 msgs.subscribe(monitorGlobal, msgs.ALL_TOPICS) 143 def monitorGlobal(topic=pub.AUTO_TOPIC, **kwargs): 144 print time.strftime("%H:%M:%S", time.localtime()), "monitorGlobal", \ 145 topic, kwargs 146 pub.subscribe(monitorGlobal, pub.ALL_TOPICS) 101 147 102 148
Note: See TracChangeset
for help on using the changeset viewer.