Changeset 1127


Ignore:
Timestamp:
19.01.2016 16:49:14 (4 years ago)
Author:
klaus
Message:

thread-safe access to stations DB using singleton class copies

Location:
SHX/trunk/SeismicHandler
Files:
4 edited

Legend:

Unmodified
Added
Removed
  • SHX/trunk/SeismicHandler/commands/meta.py

    r1126 r1127  
    1010    traces_from_list, get_meta_status, META_STATUS_LOCATION, \ 
    1111    META_STATUS_COMPLETE, META_STATUS_EMPTY 
    12 from SeismicHandler.modules.stations import ChannelMeta, Stations, Stations1,\ 
     12from SeismicHandler.modules.stations import ChannelMeta, Stations, \ 
    1313    triggerDatabaseReload 
    1414from SeismicHandler.config import Settings 
     
    3838        "ADDR", 
    3939        "DEL_INCOMPLETE", 
    40         "THID" 
    4140    ] 
    4241     
     
    6160            fdsnadr = self.qualifiers["ADDR"] 
    6261         
    63         if self.qualifiers["THID"]: 
    64             self.stations = Stations1() 
    65         else: 
    66             print "dbg: meta no thid" 
    67             self.stations = Stations() 
     62        self.stations = Stations() 
    6863 
    6964        if subcmd == 'list': 
  • SHX/trunk/SeismicHandler/modules/stations.py

    r1126 r1127  
    66 
    77import copy 
     8import threading 
    89import sqlalchemy as sa 
    910import sqlalchemy.orm as orm 
     
    225226 
    226227 
    227 class Stations(object): 
     228class StationsThread(object): 
    228229    """ 
    229230    Supply station read/write access. 
    230231    """ 
     232    # need several instances, one for each thread 
    231233    __metaclass__ = Singleton 
    232234     
     
    432434 
    433435 
    434 class Stations1(object): 
    435     """ 
    436     Supply station read/write access. 
    437     """ 
    438     __metaclass__ = Singleton 
    439      
    440     stations = {} 
    441     channels = {} 
    442  
    443     dbsessions = {} 
    444      
    445     reloadRequested = False 
    446  
    447     def __init__(self): 
    448         self.read() 
    449  
    450     def __setitem__(self, name, data): 
    451         """ 
    452         Save or update channel information. 
    453         """ 
    454         if not isinstance(data, ChannelMeta): 
    455             raise Exception("Only channel specific information can be updated " 
    456                             "or saved!") 
    457  
    458     def read(self, clear=False): 
    459         """ 
    460         Init / refresh station information. 
    461         """ 
    462         self.reloadRequested = False 
    463         # read only information 
    464         data = [] 
    465         for db in Settings.config.inventory.readonly: 
    466             data += self.__readDB(db) 
    467  
    468         # data base access r/w 
    469         self.dbreadwrite = Settings.config.inventory.database[0] 
    470         data += self.__readDB(create=True) 
    471          
    472         if clear: 
    473             self.stations = {} 
    474             self.channels = {} 
    475  
    476         for i in data: 
    477             if not i._name in self.stations: 
    478                 self.stations[i._name] = {} 
    479  
    480             try: 
    481                 self.stations[i._name][i._canal].append(i) 
    482                 self.channels[i.channel].append(i) 
    483             except KeyError: 
    484                 self.stations[i._name][i._canal] = [i] 
    485                 self.channels[i.channel] = [i] 
    486  
    487 #        print self.stations 
    488  
    489     def __readDB(self, db=None, create=False): 
    490         """ 
    491         Read raw channel data from given database. 
    492         """ 
    493         if not db: 
    494             db = self.dbreadwrite 
    495  
    496         # init db session only if necessary 
    497         if self.dbsessions.get(db, None) is None: 
    498             engine = sa.create_engine(db) 
    499  
    500             if create: 
    501                 tabledata = Base.metadata 
    502                 tabledata.create_all(engine) 
    503                 Session = orm.sessionmaker(bind=engine) 
    504             else: 
    505                 Session = orm.sessionmaker(bind=engine, autoflush=False) 
    506             s = Session() 
    507             if not create: 
    508                 # monkey patching *urgs* 
    509                 s.flush = saReadonly 
    510             self.dbsessions[db] = s 
    511  
    512         try: 
    513             a = self.dbsessions[db].query(ChannelMeta).order_by("ondate").all() 
    514             return a 
    515         except KeyError: 
    516             return [] 
    517         except OperationalError as E: 
    518             # sqlite sometimes does weird things, we log it for now 
    519             log_message("debug.stations", "%s: %s" % (db, str(E))) 
    520             return [] 
    521  
    522     def __getitem__(self, codedate): 
    523         """ 
    524         Return channel meta data from channel code and time information. Input 
    525         parameter "codedate" is supposed to be a string containing the channels 
    526         code or a list/tuple of channel and UTCDateTime information. 
    527         """ 
    528         # reload data base if requested 
    529         if self.reloadRequested: 
    530             print "dbg: perform requested update on Stations1" 
    531             self.read() 
    532  
    533         try: 
    534             code, date = codedate 
    535         except: 
    536             code = codedate 
    537             date = UTCDateTime() 
    538  
    539         code = code.upper() 
    540         meta = self.channels.get(code, None) 
    541         if meta is None: 
    542             raise KeyError("no meta data found at all for '%s'" % code) 
    543  
    544         match = None 
    545         for m in meta: 
    546             if m._start > date: 
    547                 continue 
    548  
    549             # None == open end 
    550             if m._end is not None and m._end <= date: 
    551                 continue 
    552  
    553             match = m 
    554             break 
    555  
    556         if match: 
    557             return match 
    558  
    559         raise KeyError("no meta data of '%s' found for '%s'" % (code, date)) 
    560  
    561     def add(self, station, replace=False, local=False): 
    562         """ 
    563         Add station to database. If "local" is set to True, the data will be 
    564         saved in user's database regardless if meta data is present already. 
    565         """ 
    566         # XXX check for update -> dirty session 
    567  
    568         if not isinstance(station, ChannelMeta): 
    569             raise ValueError("Wrong data type") 
    570  
    571         # reload data base if requested 
    572         if self.reloadRequested: 
    573             print "dbg: perform requested update on Stations1" 
    574             self.read() 
    575  
    576         session = self.dbsessions[self.dbreadwrite] 
    577  
    578         # try to save local in any case 
    579         if local: 
    580             try: 
    581                 session.add(station) 
    582                 session.commit() 
    583             except Exception as E: 
    584                 # query existing data 
    585                 session.rollback() 
    586                 conflict = session.query(ChannelMeta).filter_by( \ 
    587                        channel=station.channel, ondate=station.ondate).all()[0] 
    588  
    589                 # offdate identical 
    590                 if conflict.offdate <= station.offdate: 
    591                     session.delete(conflict) 
    592                 else: 
    593                     conflict.ondate = station.offdate 
    594                     conflict.ondatems = station.offdatems 
    595  
    596                 session.commit() 
    597  
    598                 # finally add new data 
    599                 session.add(station) 
    600                 session.commit() 
    601         else: 
    602             # check for conflict 
    603             try: 
    604                 update = self[station.channel, station._start] 
    605                 if update == station: 
    606                     return 
    607  
    608                 if not replace: 
    609                     raise ValueError("Concurrent data present! % " % station) 
    610  
    611                 session.delete(update) 
    612             except KeyError as E: 
    613                 if E.message.startswith("ond"): 
    614                     print E 
    615                     import pdb; pdb.set_trace() 
    616             except Exception as E: 
    617                 print "xx", E 
    618                 import pdb; pdb.set_trace() 
    619  
    620             session.add(station) 
    621  
    622             try: 
    623                 session.commit() 
    624             except Exception as e: 
    625                 print e 
    626 #                import pdb; pdb.set_trace() 
    627                 session.rollback() 
    628          
    629         # self.stations and self.channels not updated, 
    630         # automatic reread would be overkill 
    631         # applications have to call 'stations.read( clear=True )' after changes 
    632          
    633  
    634     def fetch(self, station): 
    635         """ 
    636         Fetch information from webdc servers. 
    637         """ 
    638         pass 
     436def Stations(): 
     437    "Replaces the former singleton class calls." 
     438    global stations_obj 
     439    thid = threading.currentThread().ident 
     440    if thid not in stations_obj.keys(): 
     441        stations_obj[thid] = copy.copy( stations_obj[main_thread] ) 
     442        stations_obj[thid].dbsessions = {} 
     443        stations_obj[thid].read() 
     444    return stations_obj[thid] 
    639445 
    640446 
     
    681487    "Workaround. To be optimized." 
    682488    print "dbg: DB reload triggered" 
    683     Stations().reloadRequested = True 
    684     Stations1().reloadRequested = True 
     489    global stations_obj 
     490    for k in stations_obj.keys(): 
     491        stations_obj[k].reloadRequested = True 
     492 
     493main_thread = threading.currentThread().ident 
     494stations_obj = { 
     495    main_thread : StationsThread(), 
     496} 
    685497 
    686498 
    687499if __name__ == "__main__": 
    688500    stations = Stations() 
    689     stations1 = Stations1() 
    690501    print stations[("GR.GRA1..BHZ", UTCDateTime())] 
  • SHX/trunk/SeismicHandler/modules/traces.py

    r1126 r1127  
    1212from obspy.core.stream import Stream, Trace 
    1313from SeismicHandler.config import Settings, set_dsptrcs, set_tottrcs 
    14 from SeismicHandler.modules.stations import Stations, Stations1 
     14from SeismicHandler.modules.stations import Stations 
    1515 
    1616 
     
    128128    def getMetaStatus( self, trace ): 
    129129        "Returns 0, 1 or 2 depending on metadata status." 
    130         stations = Stations1() 
     130        stations = Stations() 
    131131        sname = "%s.%s.%s.%s" % (trace.stats.network, trace.stats.station, 
    132132            trace.stats.location, trace.stats.channel) 
  • SHX/trunk/SeismicHandler/modules/wx_.py

    r1125 r1127  
    15211521     
    15221522    def OnCompleteMeta( self, e ): 
    1523         _sendShCommand( "meta all complete /thid=1" ) 
     1523        ap = AnalysisPar() 
     1524        _sendShCommand( 
     1525            "meta all complete /addr=%s" % ap.getValueAsString('readws_server') 
     1526        ) 
    15241527 
    15251528    def OnSortByDistance( self, e ): 
Note: See TracChangeset for help on using the changeset viewer.