PK G[5@VenueThermoClient.appUT  D{DUx[application] name = Shared Thermocouple Reader mimetype = application/x-ag-venuethermo extension = venuethermo files = VenueThermoClient.py startable = 0 [commands] Open = %(python)s VenueThermoClient.py -a %(appUrl)s PK F5HHVenueThermoClient.pyUT HDDUx# Name: VenueThermoClient.py # Version: 0.3 # # Copyright 2005-6 Christoph Willing # c.willing _at_ uq.edu.au # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, version 2 of the License. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA # #------------------------------------------------------------------------------ import os import sys import logging import sys import getopt import time from threading import Timer import agversion agversion.select(3) from AccessGrid import Platform from wxPython.wx import * from wxPython.gizmos import * from AccessGrid.SharedAppClient import SharedAppClient from AccessGrid.Platform.Config import UserConfig from AccessGrid.ClientProfile import ClientProfile from AccessGrid import icons from AccessGrid.Toolkit import WXGUIApplication from AccessGrid.Events import Event from AccessGrid import Log try: from twisted.internet import threadedselectreactor threadedselectreactor.install() except: pass from twisted.internet import reactor ID_EXIT = wxNewId() ID_UNITS = wxNewId() ID_PRESTART_TIMER = wxNewId() ID_UPDATE_TIMER = wxNewId() ID_TIMER_1 = wxNewId() ID_TIMER_2 = wxNewId() if wxVERSION >= (2,6): TEXT_PADDING = 16 LED_SIZES = [(135,40), (200,80), (310,120), (420,160), (530,200), (640,240), (750,280), (860,320), (1280,480)] else: TEXT_PADDING = 0 LED_SIZES = [(90,40), (200,80), (310,120), (420,160), (530,200), (640,240), (750,280), (860,320), (1280,480)] DEF_PRESET_LARGE_SIZE = 6 class VTS_Types: ATYPE = "a_type" BTYPE = "b_type" CTYPE = "c_type" DTYPE = "d_type" class VTS_Temps: ATEMP = "a_temp" BTEMP = "b_temp" CTEMP = "c_temp" DTEMP = "d_temp" class CustomStatusBar(wxStatusBar): def __init__(self, parent, log): wxStatusBar.__init__(self, parent, -1) self.parent = parent self.log = log self.SetFieldsCount(3) # Sets the three fields to be relative widths to each other. self.SetStatusWidths([-3, -2, -5]) self.sizeChanged = False EVT_SIZE(self, self.OnSize) EVT_IDLE(self, self.OnIdle) # Field 0 ... just text self.SetStatusText('J, K, K, T', 0) # Field 1 ... checkbox ID_CB_1 = wxNewId() self.cb = wxCheckBox(self, ID_CB_1, "Faded") EVT_CHECKBOX(self.cb, ID_CB_1, self.OnToggleFaded) self.cb.SetValue(True) # set the initial position of the checkbox self.Reposition() # Field 2 ... time of last data arrival self.SetStatusText('No data received so far ...', 2) # Handles events from the timer we started in __init__(). # We're using it to drive a 'clock' in field 2 (the third field). # def Notify(self): t = time.localtime(time.time()) st = time.strftime("Data: %d-%b-%Y, %I:%M:%S", t) self.SetStatusText(st, 2) def OnToggleFaded(self, evt): if self.cb.GetValue(): self.parent.SetFaded(True) else: self.parent.SetFaded(False) def OnSize(self, evt): self.Reposition() # for normal size events # Set a flag so the idle time handler will also do the repositioning. # It is done this way to get around a buglet where GetFieldRect is not # accurate during the EVT_SIZE resulting from a frame maximize. self.sizeChanged = True def OnIdle(self, evt): if self.sizeChanged: self.Reposition() self.parent.RefreshLed() # reposition the checkbox def Reposition(self): rect = self.GetFieldRect(1) self.cb.SetPosition((rect.x+2, rect.y+2)) self.cb.SetSize((rect.width-4, rect.height-4)) self.sizeChanged = False def SetThermoTypes(self, t): # Check validity of t !!! self.SetStatusText("Types: %s, %s, %s, %s" % (t[0],t[1],t[2],t[3]), 0) def DataRcvd(self): self.Notify() class ViewerFrame(wxFrame): """ """ def __init__(self, parent, ID, log, name=""): wxFrame.__init__(self, parent, ID, "", style=wxDEFAULT_FRAME_STYLE) #style=wxSYSTEM_MENU|wxCAPTION|wxSYSTEM_MENU|wxCAPTION) self.log = log self.SetTitle(name) # Which entry in LED_SIZES to use for large display self.presetLargeSize = DEF_PRESET_LARGE_SIZE menu = wxMenu() menu.Append(ID_EXIT, "&Exit", "Exit") menubar = wxMenuBar() menubar.Append(menu, "&File") self.SetMenuBar(menubar) # The main sizer box # Contains 2 vertical elements: # 1. set of 4 temperature panels # 2. a Celsius/Fahrenheit selector # self.box = wxBoxSizer(wxVERTICAL) # A set of 4 temperature panels laid out horizontally # self.tempPanels = {} self.tempsBox = wxBoxSizer(wxHORIZONTAL) for chn in ['A','B','C','D']: self.tempPanels[chn] = TemperaturePanel( self, log, label="Channel %s"%chn) self.tempsBox.Add(self.tempPanels[chn], 1, wxEXPAND) self.scaleSelect = ScalePanel(self) self.celsius, self.fahrenheit = self.scaleSelect.GetScaleTypes() self.units = self.fahrenheit self.SetScale(1) self.box.Add(self.tempsBox, 0, wxEXPAND|wxALL, border=3) self.box.Add(self.scaleSelect, 1, wxEXPAND|wxALIGN_CENTRE|wxALL, border=3) self.statusbar = CustomStatusBar(self, log) self.SetStatusBar(self.statusbar) self.SetAutoLayout(True) self.SetSizer(self.box) self.Layout() self.box.SetSizeHints(self) def SetFaded(self, faded): for chn in ['A','B','C','D']: self.tempPanels[chn].SetFaded(faded) def SetScale(self, select): self.scaleSelect.SetSelection(select) def SetLED(self, temps): """ """ #print "Update LED display" units = self.scaleSelect.GetSelection() for chn in ['A','B','C','D']: #self.tempPanels[chn].SetLED(temps[chn]) self.tempPanels[chn].SetLED(self.ScaleTemp(temps[chn], units)) def OnSelection(self, evt): """ Immediately change the display scale when selected. First retrieve current values, then redisplay in new scale. Remember SetLED() expects input in Fahrenheit scale. """ temps = {} type = self.scaleSelect.GetSelection() # the _new_ scale #self.SetScale(type) for chn in ['A','B','C','D']: if type == self.units: continue temp = self.tempPanels[chn].GetValue() # in the _old_ scale if type == self.fahrenheit: temps[chn] = self.ScaleTempFtoC(temp, self.fahrenheit) else: #temps[chn] = self.ScaleTemp(temp, self.celsius) # If new type is not Fahrenheit, then old type was fahrenheit # i.e. temp is already in fahrenheit (as desired) temps[chn] = temp self.units = type if len(temps) > 0: self.SetLED(temps) def RefreshLed(self): """ I admit it - this isn't great! I couldn't find a way to refresh the LED display properly after size events (which is when the display the display can become garbled. Here, we twice toggle the led's "faded" attribute. """ for chn in ['A','B','C','D']: faded = self.tempPanels[chn].led.GetDrawFaded() self.tempPanels[chn].SetFaded(not faded) self.tempPanels[chn].SetFaded(faded) def ScaleTempFtoC(self, val, type): if type == self.celsius: return val else: return str((int(val)*9)/5 + 32) #return str(1 + ((int(val) - 32)*5)/9) def ScaleTemp(self, val, type): if type == self.fahrenheit: return val else: #return str((int(val)*9)/5 + 32) return str(1 + ((int(val) - 32)*5)/9) class ScalePanel(wxPanel): """ """ def __init__(self, parent, select=1, label="Scale"): wxPanel.__init__(self, parent, -1, style=wxSUNKEN_BORDER) # A scale selctor (Celsius or Fahrenheit scale) # self.unitchoices = ['Celsius', 'Fahrenheit'] self.celsius = 0 self.fahrenheit = 1 self.scale = wxRadioBox(self, choices=self.unitchoices, id=ID_UNITS, label='Units', name='ScaleSelector') self.scale.SetSelection(select) wx.EVT_RADIOBOX(self, -1, parent.OnSelection) self.box = wxBoxSizer(wxVERTICAL) self.box.Add(self.scale, 0, wxALIGN_CENTRE|wxALL, border=3) self.SetAutoLayout(True) self.SetSizer(self.box) self.Layout() self.box.SetSizeHints(self) def GetScaleTypes(self): return (self.celsius, self.fahrenheit) def SetSelection(self, select): self.scale.SetSelection(select) def GetSelection(self): return self.scale.GetSelection() class TemperaturePanel(wxPanel): """ """ def __init__(self, parent, log, label="Channel: ", size=LED_SIZES[0]): wxPanel.__init__(self, parent, -1, style=wxSUNKEN_BORDER) self.log = log self.parent = parent self.size = size self.largeView = None self.box = wxBoxSizer(wxVERTICAL) (self.width, self.height) = self.size # Label for this channel # self.channelLabel = wxStaticText(self, -1, "", size=wxSize(self.width, TEXT_PADDING)) self.channelLabel.SetLabel(label) # LED element for this channel # self.led = wxLEDNumberCtrl(self, -1, size=wxSize(self.width, self.height)) self.led.SetAlignment(wxLED_ALIGN_RIGHT) self.led.SetValue("0000") # Pack the items # self.box.Add(self.channelLabel, 0, wxEXPAND|wxALL, border=3) self.box.Add(self.led, 0, wxEXPAND|wxALIGN_CENTRE|wxTOP, border=3) #self.SetAutoLayout(True) self.SetSizer(self.box) #self.Layout() self.box.SetSizeHints(self) EVT_LEFT_DOWN(self.led, self.OnLedClick) def OnLedClick(self, evt): #print "Mouse click: " if not self.largeView: self.largeView = wxFrame(self, -1, self.led.GetValue(), size=LED_SIZES[self.parent.presetLargeSize]) self.largeView.SetTitle(self.channelLabel.GetLabel()) self.largePanel = LargePanel(self.largeView, -1, self.led.GetValue()) self.largeView.Show(1) else: self.largeView.Close() self.largeView = None def SetLED(self, val): try: self.led.SetValue(str(val)) except Exception, e: self.log.exception("In TemperaturePanel.SetLED(): ") if not self.largeView == None: try: self.largePanel.SetValue(str(val)) except Exception, e: self.log.exception("In TemperaturePanel.SetLED(), closing self.largeView") self.largeView = None def GetValue(self): return self.led.GetValue() def SetFaded(self, faded): self.led.SetDrawFaded(faded) if not self.largeView == None: try: self.largePanel.SetFaded(faded) except Exception, e: self.log.exception("In TemperaturePanel.SetFaded()") class LargePanel(wxPanel): """ Panel area for a larger view of the temperature """ def OnLargeSize(self, evt): self.OnSize() def __init__(self, parent, ID, ledVal="0000"): wxPanel.__init__(self, parent, -1, style=wxSUNKEN_BORDER) self.parent = parent self.ledVal = ledVal self.box = wxBoxSizer(wxVERTICAL) self.SetSizer(self.box) EVT_SIZE(self, self.OnLargeSize) self.OnSize() def SetValue(self, val): """ Set the value of the large LED display """ #print "LargeView.SetValue(): ", val self.led.SetValue(val) def SetFaded(self, faded): self.led.SetDrawFaded(faded) def OnSize(self): """ Update the LED aspect ratio when the frame size is changed """ (newX, frameY) = self.parent.GetSize() newY = (4*newX + 80)/11 if not hasattr(self, 'led'): ledText = self.ledVal else: ledText = self.led.GetValue() self.box.Remove(self.led) self.led = None self.led = wxLEDNumberCtrl(self, -1, (newX, newY)) self.led.SetAlignment(wxLED_ALIGN_RIGHT) self.led.SetDrawFaded(False) self.led.SetValue(ledText) self.box.Add(self.led, 1, wxEXPAND|wxALL) self.SetSize((newX, newY)) self.parent.SetSize((newX, newY)) self.SetAutoLayout(True) self.Layout() class SharedThermoClient( wxApp ): """ SharedThermoClient combines a SharedApplication with a ViewerFrame to provide shared access to a thermocouple reader """ def OnInit(self): return 1 def OnExit(self, event): ''' Shut down shared thermocouple reader. ''' self.log.debug("Attempting clean shutdown") self.frame.Close(True) self.sharedAppClient.Shutdown() os._exit(1) def OnTimer(self, event): try: event.Skip() except: pass #print "Timer" self.OnIdle() def OnIdle(self, event=None): try: event.Skip() except: pass if not self.newData: return result = self.GetTempData() #self.log.debug("New temp data via OnIdle()") self.frame.SetLED(result) self.newData = False wxCallAfter(self.frame.statusbar.DataRcvd) def __init__( self, appUrl, name): ''' Creates the shared application client, used for application service interaction, and opens an interface for UI display. ''' wxApp.__init__(self, False) # Create shared application client self.sharedAppClient = SharedAppClient(name) self.log = self.sharedAppClient.InitLogging() self.newData = False # Get client profile try: clientProfileFile = os.path.join(UserConfig.instance().GetConfigDir(), "profile") clientProfile = ClientProfile(clientProfileFile) except: self.log.debug("SharedAppClient.Connect: Could not load client profile, set clientProfile = None") clientProfile = None reactor.interleave(wxCallAfter) # Join the application session. self.sharedAppClient.Join(appUrl) # Register generic data event callback self.sharedAppClient.RegisterEventCallback(Event.APP_SET_DATA , self.GenericDataCB) # Find out the temperature scale being used by server # result = self.sharedAppClient.GetData('temperature_scale') # Need to use this sometime! # Find out the thermocouple types being used by server # thermo_types = self.GetThermoTypes() self.log.info("Thermocouple_Types = %s" % thermo_types) # Find out the contact details of the server thermo_contact = self.sharedAppClient.GetData('thermo_contact') self.log.info("Thermo_Contact = %s" % thermo_contact) # Create Browser Window # self.frame = ViewerFrame(None, -1, self.log, "Thermocouple at %s" % thermo_contact) wx.EVT_MENU(self.frame, ID_EXIT, self.OnExit) self.frame.SetIcon(icons.getAGIconIcon()) self.frame.Show(1) self.SetTopWindow(self.frame) self.frame.statusbar.SetThermoTypes(thermo_types) # Set initial values in the LED display by simulating new # data reception (using the last known values from the server) # self.GenericDataCB(None) self.OnIdle(None) # Set up regular display updates self.timer = wxTimer(self, ID_TIMER_1) self.timer.Start(5000, oneShot=False) EVT_IDLE(self, self.OnIdle) def GetThermoTypes(self): """ In AG3, only simple types may be used with GetData() & SetDate() i.e. we need to fill tuples manually """ thermo_types = [] thermo_types.append(self.sharedAppClient.GetData(VTS_Types.ATYPE)) thermo_types.append(self.sharedAppClient.GetData(VTS_Types.BTYPE)) thermo_types.append(self.sharedAppClient.GetData(VTS_Types.CTYPE)) thermo_types.append(self.sharedAppClient.GetData(VTS_Types.DTYPE)) return thermo_types def GetTempData(self): """ In AG3, only simple types may be used with GetData() & SetDate() i.e. we need to fill tuples manually """ result = {} result['A'] = self.sharedAppClient.GetData(VTS_Temps.ATEMP) result['B'] = self.sharedAppClient.GetData(VTS_Temps.BTEMP) result['C'] = self.sharedAppClient.GetData(VTS_Temps.CTEMP) result['D'] = self.sharedAppClient.GetData(VTS_Temps.DTEMP) return result def GenericDataCB(self, event): """ Callback invoked on notification of some data change. We don't actually do anything, rather allow the LED updates to be done via self.OnIdle() and/or self.OnTimer() """ #self.log.debug("Received DATA callback") self.newData = True class ArgumentManager: def __init__(self): self.arguments = {} self.arguments['applicationUrl'] = None self.arguments['debug'] = 0 def GetArguments(self): return self.arguments def Usage(self): """ How to use the program. """ print "%s:" % sys.argv[0] print " -a|--applicationURL : " print " -d|--debug : " print " -h|--help : " def ProcessArgs(self): """ Handle any arguments we're interested in. """ try: opts, args = getopt.getopt(sys.argv[1:], "a:d:h", ["applicationURL=", "debug", "help"]) except getopt.GetoptError: self.Usage() sys.exit(2) for o, a in opts: if o in ("-a", "--applicationURL"): self.arguments["applicationUrl"] = a elif o in ("-d", "--debug"): self.arguments["debug"] = 1 elif o in ("-h", "--help"): self.Usage() sys.exit(0) if __name__ == "__main__": app = WXGUIApplication() name = "SharedThermoClient" # Parse command line options am = ArgumentManager() am.ProcessArgs() aDict = am.GetArguments() appUrl = aDict['applicationUrl'] debugMode = aDict['debug'] init_args = [] if "--debug" in sys.argv or "-d" in sys.argv: init_args.append("--debug") app.Initialize(name, args=init_args) loglevels = app.GetLogLevels() loglevels.SetLevel(Log.NOTSET, name) loglevels.SetLevel(Log.CRITICAL, Log.EventClient) if not appUrl: am.Usage() else: wxInitAllImageHandlers() sb = SharedThermoClient( appUrl, name) sb.MainLoop() # Changelog # # # 20060823 CKW v.03 # Handle exception when large view has disappeared for any reason # For AG3, # - change data reception format from dictionary (of temps) # and list of probe types to individual data elements, using # VTS_Temps & VTS_Types # - add twisted reactor stuff # - use agversion # # 20050818 CKW v0.2 # Now works properly in MS Windows(TM) # Enable resizing of main panel without led display gliches # # # 2005 CKW # Initial release (v0.1) PK G[5@ VenueThermoClient.appUT DUxPK F5HH $VenueThermoClient.pyUTHDUxPKJ