source: SHX/trunk/SeismicHandler/modules/stations.py @ 1136

Revision 1136, 14.9 KB checked in by klaus, 4 years ago (diff)

save and recover of traces and parameters

  • Property svn:eol-style set to native
Line 
1# -*- coding: utf-8 -*-
2
3#    This file is part of Seismic Handler eXtended (SHX). For terms of use and
4#    license information please see license.txt and visit
5#    http://www.seismic-handler.org/portal/wiki/Shx/LicenseTerms
6
7import copy
8import threading
9import sqlalchemy as sa
10import sqlalchemy.orm as orm
11from sqlalchemy.ext.declarative import declarative_base
12from sqlalchemy.exc import OperationalError
13
14from obspy.core import UTCDateTime
15from SeismicHandler.config import Settings
16from SeismicHandler.basics import Singleton
17from SeismicHandler.basics.messages import log_message
18
19FUTURE_DATE = UTCDateTime(3e10)
20
21
22Base = declarative_base()
23class ChannelMeta(Base):
24    """
25    Table holding stations inventory for single channels (e.g. GR.GRA1..BHZ).
26
27    All floating point data is stored as string in order to prevent floating
28    point issues.
29    """
30    __tablename__ = "meta"
31
32    # internal use only
33    id = sa.Column(sa.Integer, primary_key=True)
34
35    # GrÀfenberg A1
36    description = sa.Column(sa.Unicode(length=100), default=u"")
37    # GR.GRA1..BHZ
38    channel = sa.Column(sa.String(length=15))
39    # GR
40    network = sa.Column(sa.String(length=2), index=True)
41    # GRA1X
42    station = sa.Column(sa.String(length=5), index=True)
43    # 01
44    location = sa.Column(sa.String(length=2), index=True)
45    # BH
46    stream = sa.Column(sa.String(length=2), index=True)
47    # Z
48    component = sa.Column(sa.String(length=1), index=True)
49    # GRF
50    arraycode = sa.Column(sa.String(length=10), index=True, default=None)
51    # SQL does not always support datetime with microseconds
52    ondate = sa.Column(sa.DateTime, index=True)
53    ondatems = sa.Column(sa.Integer, index=True)
54    offdate = sa.Column(sa.DateTime, index=True)
55    offdatems = sa.Column(sa.Integer, index=True)
56    latitude = sa.Column(sa.String(length=20))
57    longitude = sa.Column(sa.String(length=20))
58    elevation = sa.Column(sa.String(length=20))
59    depth = sa.Column(sa.String(length=20), default="0.")
60    # array coordinate offset
61    offsetx = sa.Column(sa.String(length=20), default="")
62    offsety = sa.Column(sa.String(length=20), default="")
63    gain = sa.Column(sa.String(length=20), default=None)
64    zeros = sa.Column(sa.Text)
65    poles = sa.Column(sa.Text)
66
67    # channel + start must be unique
68    __table_args__ = (
69        sa.UniqueConstraint("channel", "ondate"), {}
70    )
71
72    def __init__(self, network, station, location, stream, component, \
73                       latitude, longitude, gain, poles, zeros, start, \
74                       elevation="0.", end=None, description=u"", depth="0.", \
75                       arraycode='', offsetx='', offsety=''):
76
77        self.channel = ".".join([network, station, location, \
78                                                 "".join([stream, component])])
79
80        keys = locals().keys()
81        del keys[keys.index("self")]
82
83        # must be handled separately
84        del keys[keys.index("start")]
85        del keys[keys.index("end")]
86
87        # save all other information into object
88        for i in keys:
89            setattr(self, i, locals()[i])
90
91        # handle start & end
92        # always cut off ms part since most systems do not support it
93        self.ondate = (start - start.microsecond/1e6).datetime
94        self.ondatems = start.microsecond
95
96        if end is not None and end == FUTURE_DATE:
97            end = None
98        if end:
99            self.offdate = (end - end.microsecond/1e6).datetime
100            self.offdatems = end.microsecond
101
102    def __repr__(self):
103        if self.offdate:
104            return "<Channel:%s %s to %s>" % (
105                self.channel,
106                self.ondate.strftime("%Y-%m-%d"),
107                self.offdate.strftime("%Y-%m-%d")
108            )
109        else:
110            return "<Channel:%s %s to NOW>" % (
111                self.channel,
112                self.ondate.strftime("%Y-%m-%d")
113            )
114
115    def __getattr__(self, name):
116        """
117        Special handling for start and end date, combination of network,
118        station and location (name) location, stream and component (canal),
119        poles and zeros.
120
121        For data like latitude, longitude, depth, elevation and gain a floating
122        point representation is returned.
123
124        This works only if an underscore prefix is used (e.g. _latitude). If
125        the plain column name is used, the raw db content is returned
126        (actually this method is not called at all, if an apropriate db column
127        exists).
128        """
129        if not name.startswith('_'):
130            return self.__getattribute__(name)
131
132        name = name.lower()[1:]
133        if name not in ["start", "end", "name", "canal"]:
134            value = self.__getattribute__(name)
135            if name in ["latitude", "longitude", "depth", "elevation", "gain"]:
136                return float(value)
137
138            if name in ["poles", "zeros"]:
139                # remove surrounding brackets []
140                value = value.strip()[1:-1]
141                chain = []
142                # parse values
143                for _i in value.split(','):
144                    chain.append(complex(_i))
145                return chain
146
147            return value
148
149        if name == "name":
150            return ".".join([
151                self.__dict__["network"],
152                self.__dict__["station"],
153                self.__dict__["location"],
154            ])
155
156        if name == "canal":
157            return "".join([
158                self.__dict__["stream"],
159                self.__dict__["component"]
160            ])
161
162        # build UTCDateTime from o[n|ff]date[ms]
163        if name == "start":
164            return UTCDateTime(self.__dict__["ondate"]) + \
165                                                self.__dict__["ondatems"] / 1e6
166       
167        # "end"
168        if not self.__dict__["offdate"]:
169            return None
170
171        return UTCDateTime(self.__dict__["offdate"]) + \
172                                            self.__dict__["offdatems"] / 1e6
173
174    def __compare(self, other):
175        """
176        Compare station information. All fields except for id and those starting
177        with underscore are compared.
178
179        FIR information is optional and not included!
180        """
181
182        for i in self.__dict__:
183            if i.startswith("_") or i == "id":
184                continue
185
186            if getattr(self, i) != getattr(other, i):
187                return False
188
189        # fir information is optional XXX
190           
191        return True
192
193    def __eq__(self, other):
194        return self.__compare(other)
195
196    def __ne__(self, other):
197        return not self.__compare(other)
198
199
200class FiR(Base):
201    """
202    Table holding finite impulse responses.
203    """
204    __tablename__ = "fir"
205
206    # internal use only
207    id = sa.Column(sa.Integer, primary_key=True)
208    channel_id = sa.Column(sa.Integer, sa.ForeignKey('meta.id'))
209
210    stage = sa.Column(sa.Integer, index=True)
211    coefficients = sa.Column(sa.Text)
212
213    channel = orm.relation(ChannelMeta, backref=orm.backref('fir', order_by=stage))
214
215    # ChannelMeta_id + stage must be unique
216    __table_args__ = (
217        sa.UniqueConstraint("channel_id", "stage"), {}
218    )
219
220    def __init__(self, stage, coefficients):
221        self.stage = stage
222        self.coefficients = coefficients
223
224    def __repr__(self):
225        return "<FIR:%s>" % self.channel
226
227
228class StationsThread(object):
229    """
230    Supply station read/write access.
231    """
232    # need several instances, one for each thread
233    __metaclass__ = Singleton
234   
235    stations = {}
236    channels = {}
237
238    dbsessions = {}
239   
240    reloadRequested = False
241
242    def __init__(self):
243        self.read()
244
245    def __setitem__(self, name, data):
246        """
247        Save or update channel information.
248        """
249        if not isinstance(data, ChannelMeta):
250            raise Exception("Only channel specific information can be updated "
251                            "or saved!")
252
253    def read(self, clear=False):
254        """
255        Init / refresh station information.
256        """
257        self.reloadRequested = False
258        # read only information
259        data = []
260        for db in Settings.config.inventory.readonly:
261            data += self.__readDB(db)
262
263        # data base access r/w
264        self.dbreadwrite = Settings.config.inventory.database[0]
265        data += self.__readDB(create=True)
266       
267        if clear:
268            self.stations = {}
269            self.channels = {}
270
271        for i in data:
272            if not i._name in self.stations:
273                self.stations[i._name] = {}
274
275            try:
276                self.stations[i._name][i._canal].append(i)
277                self.channels[i.channel].append(i)
278            except KeyError:
279                self.stations[i._name][i._canal] = [i]
280                self.channels[i.channel] = [i]
281
282#        print self.stations
283
284    def __readDB(self, db=None, create=False):
285        """
286        Read raw channel data from given database.
287        """
288        if not db:
289            db = self.dbreadwrite
290
291        # init db session only if necessary
292        if self.dbsessions.get(db, None) is None:
293            engine = sa.create_engine(db)
294
295            if create:
296                tabledata = Base.metadata
297                tabledata.create_all(engine)
298                Session = orm.sessionmaker(bind=engine)
299            else:
300                Session = orm.sessionmaker(bind=engine, autoflush=False)
301            s = Session()
302            if not create:
303                # monkey patching *urgs*
304                s.flush = saReadonly
305            self.dbsessions[db] = s
306
307        try:
308            a = self.dbsessions[db].query(ChannelMeta).order_by("ondate").all()
309            return a
310        except KeyError:
311            return []
312        except OperationalError as E:
313            # sqlite sometimes does weird things, we log it for now
314            log_message("debug.stations", "%s: %s" % (db, str(E)))
315            return []
316
317    def __getitem__(self, codedate):
318        """
319        Return channel meta data from channel code and time information. Input
320        parameter "codedate" is supposed to be a string containing the channels
321        code or a list/tuple of channel and UTCDateTime information.
322        """
323        # reload data base if requested
324        if self.reloadRequested:
325            self.read()
326
327        try:
328            code, date = codedate
329        except:
330            code = codedate
331            date = UTCDateTime()
332
333        code = code.upper()
334        meta = self.channels.get(code, None)
335        if meta is None:
336            raise KeyError("no meta data found at all for '%s'" % code)
337
338        match = None
339        for m in meta:
340            if m._start > date:
341                continue
342
343            # None == open end
344            if m._end is not None and m._end <= date:
345                continue
346
347            match = m
348            break
349
350        if match:
351            return match
352
353        raise KeyError("no meta data of '%s' found for '%s'" % (code, date))
354
355    def add(self, station, replace=False, local=False):
356        """
357        Add station to database. If "local" is set to True, the data will be
358        saved in user's database regardless if meta data is present already.
359        """
360        # XXX check for update -> dirty session
361
362        if not isinstance(station, ChannelMeta):
363            raise ValueError("Wrong data type")
364
365        # reload data base if requested
366        if self.reloadRequested:
367            self.read()
368
369        session = self.dbsessions[self.dbreadwrite]
370
371        # try to save local in any case
372        if local:
373            try:
374                session.add(station)
375                session.commit()
376            except Exception as E:
377                # query existing data
378                session.rollback()
379                conflict = session.query(ChannelMeta).filter_by( \
380                       channel=station.channel, ondate=station.ondate).all()[0]
381
382                # offdate identical
383                if conflict.offdate <= station.offdate:
384                    session.delete(conflict)
385                else:
386                    conflict.ondate = station.offdate
387                    conflict.ondatems = station.offdatems
388
389                session.commit()
390
391                # finally add new data
392                session.add(station)
393                session.commit()
394        else:
395            # check for conflict
396            try:
397                update = self[station.channel, station._start]
398                if update == station:
399                    return
400
401                if not replace:
402                    raise ValueError("Concurrent data present! % " % station)
403
404                session.delete(update)
405            except KeyError as E:
406                if E.message.startswith("ond"):
407                    print E
408                    import pdb; pdb.set_trace()
409            except Exception as E:
410                print "xx", E
411                import pdb; pdb.set_trace()
412
413            session.add(station)
414
415            try:
416                session.commit()
417            except Exception as e:
418                print e
419#                import pdb; pdb.set_trace()
420                session.rollback()
421       
422        # self.stations and self.channels not updated,
423        # automatic reread would be overkill
424        # applications have to call 'stations.read( clear=True )' after changes
425       
426
427    def fetch(self, station):
428        """
429        Fetch information from webdc servers.
430        """
431        pass
432
433
434def Stations():
435    "Replaces the former singleton class calls."
436    global stations_obj
437    thid = threading.currentThread().ident
438    if thid not in stations_obj.keys():
439        stations_obj[thid] = copy.copy( stations_obj[main_thread] )
440        stations_obj[thid].dbsessions = {}
441        stations_obj[thid].read()
442    return stations_obj[thid]
443
444
445
446def resolveStations(stations):
447    """
448    Method for resolving station groups and/or aliases.
449
450    Returns a set of station codes in miniseed dialect.
451    """
452
453    # split string at comma, otherwise assume a list or tuple
454    if isinstance(stations, basestring):
455        stations = [i.strip() for i in stations.split(",")]
456
457    # first: station group
458    intermediate = []
459    for s in stations:
460        try:
461            intermediate += Settings.config["station-groups"][s.lower()]
462        except:
463            intermediate.append(s)
464
465    # lookup station aliases
466    final = []
467    for s in intermediate:
468        rpl = Settings.config["station-aliases"].get(s.lower(), [s,])[0]
469        # handle duplicate stations
470        if rpl not in final:
471            final.append(rpl)
472
473    return final
474
475
476def saReadonly(*args, **kwargs):
477    """
478    Dummy method for supporting read-only access to SQLite DB
479
480    http://writeonly.wordpress.com/2009/07/16/simple-read-only-sqlalchemy-sessions/
481    """
482    return
483
484def triggerDatabaseReload():
485    "Workaround. To be optimized."
486    global stations_obj
487    for k in stations_obj.keys():
488        stations_obj[k].reloadRequested = True
489
490main_thread = threading.currentThread().ident
491stations_obj = {
492    main_thread : StationsThread(),
493}
494
495
496if __name__ == "__main__":
497    stations = Stations()
498    print stations[("GR.GRA1..BHZ", UTCDateTime())]
Note: See TracBrowser for help on using the repository browser.