The Open Lighting Project has moved!
We've launched our new site at www.openlighting.org. This wiki will remain and be updated with more technical information.
We've launched our new site at www.openlighting.org. This wiki will remain and be updated with more technical information.
Difference between revisions of "OlaLED"
From wiki.openlighting.org
Line 254: | Line 254: | ||
nano /srv/dmx/ola_send_dmx.py | nano /srv/dmx/ola_send_dmx.py | ||
+ | |||
+ | |||
+ | Writted by David Magniez pubmag at free dot fr |
Latest revision as of 13:48, 1 November 2010
Introduction
In this topic, you will learn how to install a python script to permit to control a LED / RGB system via webcontrol (or other python script)
prerequisites
You need to have a fully functional ola system
Install script
create a directory to store all these new functionnality
mkdir -p /srv/dmx/
write a this new python script in this folder :
nano /srv/dmx/server.py
#!/usr/bin/python import SocketServer import threading import time from ola_send_dmx import * my_thread = 0 my_command = 'start' actual_program = 'color' ## program wanted param1 = 'red' ## first param intensity = 100 ## current intensity actual_fixed_color = 0 ## if fixed color choice, put args here to remember (if having to change intensity) R_before = 0 # current color state G_before = 0 B_before = 0 pitch = 1 class MyTCPHandler(SocketServer.BaseRequestHandler): """ The RequestHandler class for our server. It is instantiated once per connection to the server, and must override the handle() method to implement communication to the client. """ def handle(self): # self.request is the TCP socket connected to the client global my_thread self.data = self.request.recv(1024).strip() # was a test #if self.data == 'reset' : # global my_command # my_command = 'reset' #else : command = self.data.split() command = command[0] if command == 'intensity': # We separe intensity because should lauch in parallel another_thread = Thread_dmx(self.data) # thread named different for able controlling mythread another_thread.start() print "thread intensite lance" # just send back the same data, but upper-cased self.request.send(self.data.upper()) else: if my_thread != 0 : #if thread is already launched and is not about intensity try: my_thread.stop() #stop thread my_thread.join() #stop program to be sure thread is stopped except NameError: print "An error occured when trying to stop previous thread." print "launching thread " + self.data my_thread = Thread_dmx(self.data) my_thread.start() # just send back the same data to the client, but upper-cased self.request.send(self.data.upper()) class Thread_dmx(threading.Thread): def __init__(self, nom = ''): threading.Thread.__init__(self) self.nom = nom self.Terminated = False def run(self): global actual_program global intensity global fixed_color ## separate command and arguments ## words = self.nom.split() if len(words) == 0: print('Error: empty request\n') return command = words[0] args = words[1:] methodname = 'do_' + command ## relauch sending DMX for fixed color, if intensity changed if command=='intensity': intensity = abs(int(args[0])) if intensity > 100: return if actual_program == 'fade': method = getattr(self, 'do_fade') method(*fixed_color) else: actual_program = command ## take care of inexistent function if not hasattr(self, methodname): print('Error: ',command,' is not a valid command') return ## save color if fixed color if command=='fade': fixed_color = args ## launch corresponding function method = getattr(self, methodname) method(*args) ## Stop the thread if called def stop(self): self.Terminated = True ## Go to specified DMX adresses from previous adresses in fade def do_fade(self,R_after,G_after,B_after,speed): global R_before global G_before global B_before global intensity R_after=int(round((int(R_after)*intensity)/100)) G_after=int(round((int(G_after)*intensity)/100)) B_after=int(round((int(B_after)*intensity)/100)) if(R_after==R_before and G_after==G_before and B_after==B_before): print('Etat deja atteint') return speed=float(speed) actual_intensity = intensity time_to_sleep=(speed/max(abs(R_before-R_after),abs(G_before-G_after),abs(B_before-B_after))) # count to know wich pitch to apply (processor free) speed_reference=float(0.01) print(time_to_sleep) pitch=int(speed_reference/time_to_sleep) print(pitch) if pitch == 0: pitch = int(1) print(pitch) ## while ending color are not reached or no order for stopping, increment data while (R_before-R_after!=0 or G_before-G_after!=0 or B_before-B_after!=0) and (not self.Terminated) : ## If intensity was changed, take care of this if actual_intensity != intensity: R_after=int(round((int(R_after)*intensity)/100)) G_after=int(round((int(G_after)*intensity)/100)) B_after=int(round((int(B_after)*intensity)/100)) actual_intensity = intensity R_diff=R_before-R_after G_diff=G_before-G_after B_diff=B_before-B_after if R_diff <= -pitch : R_before += pitch elif R_diff >= pitch : R_before-= pitch else : R_before = R_after if G_diff <= -pitch : G_before += pitch elif G_diff >= pitch : G_before -= pitch else : G_before=G_after if B_diff <= -pitch : B_before += pitch elif B_diff >=pitch : B_before -= pitch else : B_before=B_after #print('AFTER : ',R_after,G_after,B_after) print('values =',R_before,G_before,B_before) print('sleep =',time_to_sleep*pitch) print('pitch =',pitch) dmx_send(R_before,G_before,B_before) time.sleep(time_to_sleep*pitch) ## make a chase from red to blue, indefinitely def do_chase(self,speed): intensity = 255 speed = float(speed) while not self.Terminated : self.do_fade(intensity,0,intensity,speed) print("7") print("1") self.do_fade(intensity,0,0,speed) print("2") self.do_fade(intensity,intensity,0,speed) print("3") self.do_fade(0,intensity,0,speed) print("4") self.do_fade(0,intensity,intensity,speed) print("5") self.do_fade(0,0,intensity,speed) print("6") #self.do_fade(intensity,0,intensity,speed) #print("7") if __name__ == "__main__": HOST, PORT = "localhost", 9999 # Create the server, binding to localhost on port 9999 server = SocketServer.TCPServer((HOST, PORT), MyTCPHandler) # Activate the server; this will keep running until you # interrupt the program with Ctrl-C server.serve_forever()
copy client_wrapper.py in this folder :
cp /usr/src/ola-0.7.4/python/examples/client_wrapper.py /srv/dmx/
create a new python file function : ola_send_dmx.py
nano /srv/dmx/ola_send_dmx.py
Writted by David Magniez pubmag at free dot fr