Changeset 1125


Ignore:
Timestamp:
18.01.2016 23:02:45 (4 years ago)
Author:
klaus
Message:

quick and dirty fix of threading problem with Stations class

Location:
SHX/trunk/SeismicHandler
Files:
3 edited

Legend:

Unmodified
Added
Removed
  • SHX/trunk/SeismicHandler/commands/meta.py

    r1119 r1125  
    1010    traces_from_list, get_meta_status, META_STATUS_LOCATION, \ 
    1111    META_STATUS_COMPLETE, META_STATUS_EMPTY 
    12 from SeismicHandler.modules.stations import ChannelMeta, Stations 
     12from SeismicHandler.modules.stations import ChannelMeta, Stations, Stations1 
    1313from SeismicHandler.config import Settings 
    1414from SeismicHandler.basics import timeit 
     
    3737        "ADDR", 
    3838        "DEL_INCOMPLETE", 
     39        "THID" 
    3940    ] 
    4041     
     
    4950        "Metadata management." 
    5051 
    51         self.stations = Stations() 
    5252        traces = traces_from_list(self.parameters[0]) 
    5353        subcmd = self.parameters[1].lower() 
     
    5959        if self.qualifiers["ADDR"]: 
    6060            fdsnadr = self.qualifiers["ADDR"] 
     61         
     62        if self.qualifiers["THID"]: 
     63            self.stations = Stations1() 
     64        else: 
     65            print "dbg: meta no thid" 
     66            self.stations = Stations() 
    6167 
    6268        if subcmd == 'list': 
     
    188194                    zeros=str(paz.zeros), 
    189195                ) 
    190             Stations().add(meta, replace=True, local=True) 
     196            self.stations.add(meta, replace=True, local=True) 
    191197        # reread DB due to bug in Db interface 
    192         Stations().read( clear=True ) 
     198        self.stations.read( clear=True ) 
    193199        for trc in traces: 
    194200            trc.set_info( 'METASTATUS', get_meta_status( trc ) ) 
  • SHX/trunk/SeismicHandler/modules/stations.py

    r1074 r1125  
    418418 
    419419 
     420class Stations1(object): 
     421    """ 
     422    Supply station read/write access. 
     423    """ 
     424    __metaclass__ = Singleton 
     425     
     426    stations = {} 
     427    channels = {} 
     428 
     429    dbsessions = {} 
     430 
     431    def __init__(self): 
     432        self.read() 
     433 
     434    def __setitem__(self, name, data): 
     435        """ 
     436        Save or update channel information. 
     437        """ 
     438        if not isinstance(data, ChannelMeta): 
     439            raise Exception("Only channel specific information can be updated " 
     440                            "or saved!") 
     441 
     442    def read(self, clear=False): 
     443        """ 
     444        Init / refresh station information. 
     445        """ 
     446        # read only information 
     447        data = [] 
     448        for db in Settings.config.inventory.readonly: 
     449            data += self.__readDB(db) 
     450 
     451        # data base access r/w 
     452        self.dbreadwrite = Settings.config.inventory.database[0] 
     453        data += self.__readDB(create=True) 
     454         
     455        if clear: 
     456            self.stations = {} 
     457            self.channels = {} 
     458 
     459        for i in data: 
     460            if not i._name in self.stations: 
     461                self.stations[i._name] = {} 
     462 
     463            try: 
     464                self.stations[i._name][i._canal].append(i) 
     465                self.channels[i.channel].append(i) 
     466            except KeyError: 
     467                self.stations[i._name][i._canal] = [i] 
     468                self.channels[i.channel] = [i] 
     469 
     470#        print self.stations 
     471 
     472    def __readDB(self, db=None, create=False): 
     473        """ 
     474        Read raw channel data from given database. 
     475        """ 
     476        if not db: 
     477            db = self.dbreadwrite 
     478 
     479        # init db session only if necessary 
     480        if self.dbsessions.get(db, None) is None: 
     481            engine = sa.create_engine(db) 
     482 
     483            if create: 
     484                tabledata = Base.metadata 
     485                tabledata.create_all(engine) 
     486                Session = orm.sessionmaker(bind=engine) 
     487            else: 
     488                Session = orm.sessionmaker(bind=engine, autoflush=False) 
     489            s = Session() 
     490            if not create: 
     491                # monkey patching *urgs* 
     492                s.flush = saReadonly 
     493            self.dbsessions[db] = s 
     494 
     495        try: 
     496            a = self.dbsessions[db].query(ChannelMeta).order_by("ondate").all() 
     497            return a 
     498        except KeyError: 
     499            return [] 
     500        except OperationalError as E: 
     501            # sqlite sometimes does weird things, we log it for now 
     502            log_message("debug.stations", "%s: %s" % (db, str(E))) 
     503            return [] 
     504 
     505    def __getitem__(self, codedate): 
     506        """ 
     507        Return channel meta data from channel code and time information. Input 
     508        parameter "codedate" is supposed to be a string containing the channels 
     509        code or a list/tuple of channel and UTCDateTime information. 
     510        """ 
     511        try: 
     512            code, date = codedate 
     513        except: 
     514            code = codedate 
     515            date = UTCDateTime() 
     516 
     517        code = code.upper() 
     518        meta = self.channels.get(code, None) 
     519        if meta is None: 
     520            raise KeyError("no meta data found at all for '%s'" % code) 
     521 
     522        match = None 
     523        for m in meta: 
     524            if m._start > date: 
     525                continue 
     526 
     527            # None == open end 
     528            if m._end is not None and m._end <= date: 
     529                continue 
     530 
     531            match = m 
     532            break 
     533 
     534        if match: 
     535            return match 
     536 
     537        raise KeyError("no meta data of '%s' found for '%s'" % (code, date)) 
     538 
     539    def add(self, station, replace=False, local=False): 
     540        """ 
     541        Add station to database. If "local" is set to True, the data will be 
     542        saved in user's database regardless if meta data is present already. 
     543        """ 
     544        # XXX check for update -> dirty session 
     545 
     546        if not isinstance(station, ChannelMeta): 
     547            raise ValueError("Wrong data type") 
     548 
     549        session = self.dbsessions[self.dbreadwrite] 
     550 
     551        # try to save local in any case 
     552        if local: 
     553            try: 
     554                session.add(station) 
     555                session.commit() 
     556            except Exception as E: 
     557                # query existing data 
     558                session.rollback() 
     559                conflict = session.query(ChannelMeta).filter_by( \ 
     560                       channel=station.channel, ondate=station.ondate).all()[0] 
     561 
     562                # offdate identical 
     563                if conflict.offdate <= station.offdate: 
     564                    session.delete(conflict) 
     565                else: 
     566                    conflict.ondate = station.offdate 
     567                    conflict.ondatems = station.offdatems 
     568 
     569                session.commit() 
     570 
     571                # finally add new data 
     572                session.add(station) 
     573                session.commit() 
     574        else: 
     575            # check for conflict 
     576            try: 
     577                update = self[station.channel, station._start] 
     578                if update == station: 
     579                    return 
     580 
     581                if not replace: 
     582                    raise ValueError("Concurrent data present! % " % station) 
     583 
     584                session.delete(update) 
     585            except KeyError as E: 
     586                if E.message.startswith("ond"): 
     587                    print E 
     588                    import pdb; pdb.set_trace() 
     589            except Exception as E: 
     590                print "xx", E 
     591                import pdb; pdb.set_trace() 
     592 
     593            session.add(station) 
     594 
     595            try: 
     596                session.commit() 
     597            except Exception as e: 
     598                print e 
     599#                import pdb; pdb.set_trace() 
     600                session.rollback() 
     601         
     602        # self.stations and self.channels not updated, 
     603        # automatic reread would be overkill 
     604        # applications have to call 'stations.read( clear=True )' after changes 
     605         
     606 
     607    def fetch(self, station): 
     608        """ 
     609        Fetch information from webdc servers. 
     610        """ 
     611        pass 
     612 
     613 
    420614def resolveStations(stations): 
    421615    """ 
     
    459653if __name__ == "__main__": 
    460654    stations = Stations() 
     655    stations1 = Stations1() 
    461656    print stations[("GR.GRA1..BHZ", UTCDateTime())] 
  • SHX/trunk/SeismicHandler/modules/wx_.py

    r1124 r1125  
    15211521     
    15221522    def OnCompleteMeta( self, e ): 
    1523         _sendShCommand( "meta all complete" ) 
     1523        _sendShCommand( "meta all complete /thid=1" ) 
    15241524 
    15251525    def OnSortByDistance( self, e ): 
Note: See TracChangeset for help on using the changeset viewer.