#! /usr/bin/env python """ --------------------------------------------------------------- NAME: tlctrl (Traffic light control) SYNOPSYS: tlctrl.py [options] DESCRIPTION: Control simple traffic light in crossroads. OPTIONS: -S CI4A server IP address or name (constraint) -p CI4A IO port number (constraint) -l logfile path & name EXAMPLE: tlctrl.py -S 10.1.16.68 -p 7500 -l /tmp/ci4adbg.log Connect to CI4A driver on 10.1.16.68:7500, write logfile. """ __version__="@(#)1.0 $Revision: 1.00 $" import time import getopt import threading import os import sys import socket import select class _Options: """Checks and stores command line options and arguments.""" def __init__(self, commandLine): """Initialize option defaults, start parse""" self._server = None self._portnum = 0 self._logname = None self._logwrite = False self._parseCommandLine(commandLine) def _parseCommandLine(self, commandLine): """Parsing command line, reading switches and parameters""" options, arguments = getopt.getopt(commandLine, "S:p:l:") if not options: raise Exception("ARGS_MISSED") for option in options: if option[0] == "-S": self._server = option[1] if option[0] == "-p": self._portnum = int(option[1]) if option[0] == "-l": self._logwrite = True self._logname = option[1] if not(self._server and self._portnum): raise Exception("ARGS_MISSED") class _Debug: """Debugger class""" def __init__(self, options, timewrite): """Open and append logfile for debugging""" self._timewrite = timewrite self._options = options def WriteToLog(self, text): """Write a debug string to logfile""" if self._options._logwrite: self._logfile = open(self._options._logname, "a") if self._timewrite: self._logfile.write(time.strftime('%X %x %Z')+" > ") self._logfile.write(text) self._logfile.close() class _TrafficLight: """Traffic light data and statechange times""" def __init__(self, Debug): self.Debug = Debug self.OutputData = \ { "N|N" : "S0OB01V000", "R|G" : "S0OB01V033", "R|Y" : "S0OB01V034", "R|R" : "S0OB01V036", "RY|R" : "S0OB01V052", "G|R" : "S0OB01V012", "Y|R" : "S0OB01V020", "R|RY" : "S0OB01V038", "Y|Y" : "S0OB01V018" } self.Normal = \ { 0 : "R|G", 10 : "R|Y", 12 : "R|R", 13 : "RY|R", 14 : "G|R", 25 : "Y|R", 26 : "R|R", 27 : "R|RY" } self.Semi = \ { 0 : "N|N", 1 : "Y|Y" } self.Off = \ { 0 : "N|N", 1 : "N|N" } def getoutputdata(self, TCRindex, Mode): dict = getattr(self, Mode) return self.OutputData.get(dict.get(TCRindex, "NOTHING"),"DO_NOTHING") class _WorkingThread(threading.Thread): def __init__(self, TCR, TrafficLight, CI4AComm, Debug): threading.Thread.__init__(self) self.RunFlag = True self.Mode = 'Off' self.TCR = TCR self.TrafficLight = TrafficLight self.CI4AComm = CI4AComm self.Debug = Debug self.Action = '' self.Debug.WriteToLog("working thread started\n") def run(self): Input = '' while self.RunFlag: Action = self.TrafficLight.getoutputdata(self.TCR.gettimecode(), self.Mode) if (Action != "DO_NOTHING") and (Action != self.Action): self.Debug.WriteToLog("sended command to CI4A: %s\n"%Action) self.CI4AComm.send(Action+'\r') self.Action = Action if self.CI4AComm.getinputflag(): Input = self.CI4AComm.getinputdata() self.CI4AComm.clearinputflag() self.Debug.WriteToLog('received command from CI4A: %s\n'%Input) if Input == 'S0IB01V001\r': self.setmode('Normal') if Input == 'S0IB01V002\r': self.setmode('Semi') if Input == 'S0IB01V004\r': self.setmode('Off') if Input == 'S0IB01V008\r': self.RunFlag = False time.sleep(0.1) def stop(self): self.RunFlag = False def setmode(self, Mode): self.Mode = Mode self.Debug.WriteToLog("change mode to %s\n"%Mode) if self.Mode == 'Normal': self.TCR.changereload(29) if (self.Mode == 'Semi') or (self.Mode == 'Off'): self.TCR.changereload(2) class _TCRThread(threading.Thread): def __init__(self, debug): threading.Thread.__init__(self) self.RunFlag = True self.TCR = 0 self.Reload = 100 self.Debug = debug self.Debug.WriteToLog("time code generator thread started\n") def run(self): while self.RunFlag: self.TCR += 1 if self.Reload <= self.TCR: self.TCR = 0 time.sleep(1) def stop(self): self.RunFlag = False def changereload(self, Value): self.TCR = 0 self.Reload = Value def gettimecode(self): return self.TCR class _CI4ACommThread(threading.Thread): def __init__(self, options, debug): threading.Thread.__init__(self) self.RunFlag = True self.InputFlag = False self.Comm = socket.socket(socket.AF_INET, socket.SOCK_STREAM) try: self.Comm.connect((options._server, options._portnum)) except: debug.WriteToLog("CI4A server not available on %s:%d\n"%(options._server, options._portnum)) debug.WriteToLog("tlctrl stopped with error\n\n") raise Exception("ERROR_IN_LOG") debug.WriteToLog("CI4A communication thread started\n") def run(self): while self.RunFlag: i, o, e = select.select([self.Comm], [], [], 0.01) for x in i: self.Input = x.recv(20) if self.Input: self.InputFlag = True self.Comm.close() def stop(self): self.RunFlag = False def send(self, text): self.Comm.send(text) def getinputflag(self): return(self.InputFlag) def clearinputflag(self): self.InputFlag = False def getinputdata(self): return(self.Input) def disconnect(self): self.Comm.disconnect() if __name__ == "__main__": """Main program""" exitCode = 0 try: Options = _Options(sys.argv[1:]) Debug = _Debug(Options, True) Debug.WriteToLog('tlctrl started\n') TrafficLight = _TrafficLight(Debug) CI4AComm = _CI4ACommThread(Options, Debug) CI4AComm.start() TCR = _TCRThread(Debug) TCR.start() WORKING = _WorkingThread(TCR, TrafficLight, CI4AComm, Debug) WORKING.start() while WORKING.RunFlag: time.sleep(0.1) CI4AComm.stop() TCR.stop() WORKING.stop() Debug.WriteToLog('tlctrl stopped normally\n\n') except Exception, e: if "ARGS_MISSED" in e.args: exitCode = 1 print print __doc__ print if "ERROR_IN_LOG" in e.args: exitCode = 2 print "Error, tlctrl stopped, see debug log for details!" sys.exit(exitCode)