Navigation

    IO Rodeo Forum

    • Register
    • Login

    Live Plot of CV Data

    Rodeostat
    4
    12
    12727
    Loading More Posts
    • Oldest to Newest
    • Newest to Oldest
    • Most Votes
    Reply
    • Reply as topic
    Log in to reply
    This topic has been deleted. Only users with topic management privileges can see it.
    • S
      Steffen last edited by

      Hi

      I was wondering if there is an easy way I can plot the measurement data of the Rodeostat live while running e.g. a CV measurement? It should be noted that I am not in any way experienced when it comes to Python programming and I am also only a beginner when it comes to Arduino programming.
      I have managed to create a data collection system with live data plotting for our old manual potentiostat/galavanostat using Arduino and C# but I chose to order the Rodeostat instead of trying to develop a similar microcontroller based system myself.

      Best Regards
      Steffen Vindt

      W 1 Reply Last reply Reply Quote 0
      • W
        Will Dickson @Steffen last edited by Will Dickson

        @Steffen

        Yes. The easiest way is probably to create a sub-class of the Potentiostat class with it own run_test method which creates the live plot instead of the progress bar. I give a simple example below.

        from __future__ import print_function
        import json
        import threading
        import Queue
        import matplotlib.pyplot as plt
        import potentiostat as ps
        
        
        class PotentiostatLivePlot(ps.Potentiostat):
            """
            Custom version of potentiostat with displays a live plot when running a test.
            """
        
            def __init__(self, port, timeout=10.0, debug=False):
                super(PotentiostatLivePlot,self).__init__(port,timeout=timeout,debug=debug)
                self.data_queue = Queue.Queue()
        
        
            def get_default_volt_lims(self):
                volt_range = self.get_volt_range()
                volt_range_value = float(volt_range[:-1])
                volt_lims = -volt_range_value, volt_range_value
                return volt_lims
        
        
            def get_default_curr_lims(self):
                curr_range = self.get_curr_range()
                curr_range_value = float(curr_range[:-2])
                curr_lims = -curr_range_value, curr_range_value
                return curr_lims
        
        
            def get_curr_unit(self):
                curr_range = self.get_curr_range()
                curr_unit = curr_range[-2:]
                return curr_unit
        
        
            def receive_data(self,timeunit):
                while True:
                    # Get json data from the device
                    sample_json = self.readline()
                    sample_json = sample_json.strip()
                    try:
                        sample_dict = json.loads(sample_json.decode())
                    except ValueError:
                        continue
                    # Put new values in data queue
                    if len(sample_dict) > 0:
                        tval = sample_dict[ps.TimeKey]*ps.TimeUnitToScale[timeunit]
                        volt = sample_dict[ps.VoltKey]
                        curr = sample_dict[ps.CurrKey]
                        self.data_queue.put({'tval': tval, 'volt': volt, 'curr': curr})
                    else:
                        self.data_queue.put({})
        
        
        
            def run_test(self, testname, param=None, filename=None, volt_lims=None, curr_lims=None, timeunit='s'):
        
                if timeunit not in ps.TimeUnitToScale:
                    raise RuntimeError('uknown timeunit option {0}'.format(timeunit))
        
                if volt_lims is None:
                    volt_lims = self.get_default_volt_lims()
        
                if curr_lims is None:
                    curr_lims = self.get_default_curr_lims()
        
                curr_unit = self.get_curr_unit() 
        
                if param is not None:
                    self.set_param(testname,param)
        
                if filename is not None:
                    fid = open(filename,'w')
        
                t_done = self.get_test_done_time(testname, timeunit=timeunit)
        
                # Create figure and live plot
                fig = plt.figure()
                plt.ion()
        
                volt_axes = plt.subplot(211)
                volt_line, = plt.plot([0],[0],'b')
                plt.xlim(0,t_done)
                plt.ylim(*volt_lims)
                plt.grid('on')
                plt.ylabel('(V)')
                plt.title('Potentiostat Live Plot')
        
                curr_axes = plt.subplot(212,sharex=volt_axes)
                curr_line, = plt.plot([0],[0],'b')
                plt.xlim(0,t_done)
                plt.ylim(*curr_lims)
                plt.grid('on')
                plt.ylabel('({0})'.format(curr_unit))
                plt.xlabel('({0})'.format(timeunit))
        
                fig.canvas.flush_events()
                plt.pause(0.001)
        
                # Create lists for storing incoming data
                tval_list = [] 
                volt_list = [] 
                curr_list = []
        
                # Setup up working thread to collect data from device
                data_worker = threading.Thread(target=self.receive_data,args=(timeunit,))
                data_worker.daemon = True
        
                # Send command to run the test
                cmd_dict = {
                        ps.CommandKey: ps.RunTestCmd, 
                        ps.TestKey: testname
                        }
                msg_dict = self.send_cmd(cmd_dict)
                self.test_running = True
        
                data_worker.start()
                done = False
        
                while not done:
        
                    # Get available data from data queue and add to list
                    while self.data_queue.qsize() > 0:
                        data = self.data_queue.get(True)
                        if not data:
                            done = True # Empty data means run is complete
                            break
        
                        # Get data items and append to lists
                        tval = data['tval']
                        volt = data['volt']
                        curr = data['curr']
                        tval_list.append(tval)
                        volt_list.append(volt)
                        curr_list.append(curr)
        
                        # Write data to file
                        if filename is not None:
                            fid.write('{0:1.3f}, {1:1.4f}, {2:1.4f}\n'.format(tval,volt,curr))
        
                        print('{0:1.3f}, {1:1.4f}, {2:1.4f}'.format(tval,volt,curr))
        
                    # Update live plot
                    volt_line.set_xdata(tval_list)
                    volt_line.set_ydata(volt_list)
                    curr_line.set_xdata(tval_list)
                    curr_line.set_ydata(curr_list)
                    fig.canvas.flush_events()
                    plt.pause(0.001)
        
                self.test_running = False
        
                if filename is not None:
                    fid.close()
        
                return tval_list, volt_list, curr_list 
        

         

        Assuming you have saved this to a new module called potentiostat_live_plot.py then you could call it as follows.

         

        from potentiostat_live_plot import PotentiostatLivePlot
        
        dev = PotentiostatLivePlot(port='/dev/ttyACM0')
        
        dev.set_curr_range('100uA')
        dev.set_sample_period(10)
        
        name = 'cyclic'
        param = {
                'quietValue' : 0.0,
                'quietTime'  : 1000,
                'amplitude'  : 2.0,
                'offset'     : 0.0,
                'period'     : 1000,
                'numCycles'  : 15,
                'shift'      : 0.0,
                }
        
        t, volt, curr = dev.run_test(name,param,filename='data.txt', curr_lims=(-50,50))
        
        raw_input('Press enter to quit')
        

         

        The live plot should look something like the show image below.

         

        live plot image

         

        In this example there are two plots which are updated in realtime - one showing the output voltage vs time and the other showing the current vs time.

        1 Reply Last reply Reply Quote 0
        • S
          Steffen last edited by

          Hi Will

          Thanks a lot 🙂
          How should I change "potentiostat_live_plot.py" to plot a classical CV with the potential on the x-axis and the current on the y-axis?

          Best Regards
          Steffen

          W 1 Reply Last reply Reply Quote 0
          • W
            Will Dickson @Steffen last edited by Will Dickson

            @Steffen

            Here is a modified version which plots the classical CV type plot

            from __future__ import print_function
            import json
            import threading
            import Queue
            import matplotlib.pyplot as plt
            import potentiostat as ps
            
            
            class PotentiostatLivePlot(ps.Potentiostat):
                """
                Custom version of potentiostat with displays a live plot when running a test.
                """
            
                def __init__(self, port, timeout=10.0, debug=False):
                    super(PotentiostatLivePlot,self).__init__(port,timeout=timeout,debug=debug)
                    self.data_queue = Queue.Queue()
            
            
                def get_default_volt_lims(self):
                    volt_range = self.get_volt_range()
                    volt_range_value = float(volt_range[:-1])
                    volt_lims = -volt_range_value, volt_range_value
                    return volt_lims
            
            
                def get_default_curr_lims(self):
                    curr_range = self.get_curr_range()
                    curr_range_value = float(curr_range[:-2])
                    curr_lims = -curr_range_value, curr_range_value
                    return curr_lims
            
            
                def get_curr_unit(self):
                    curr_range = self.get_curr_range()
                    curr_unit = curr_range[-2:]
                    return curr_unit
            
            
                def receive_data(self,timeunit):
                    while True:
                        # Get json data from the device
                        sample_json = self.readline()
                        sample_json = sample_json.strip()
                        try:
                            sample_dict = json.loads(sample_json.decode())
                        except ValueError:
                            continue
                        # Put new values in data queue
                        if len(sample_dict) > 0:
                            tval = sample_dict[ps.TimeKey]*ps.TimeUnitToScale[timeunit]
                            volt = sample_dict[ps.VoltKey]
                            curr = sample_dict[ps.CurrKey]
                            self.data_queue.put({'tval': tval, 'volt': volt, 'curr': curr})
                        else:
                            self.data_queue.put({})
            
            
            
                def run_test(self, testname, param=None, filename=None, volt_lims=None, curr_lims=None, timeunit='s'):
            
                    if timeunit not in ps.TimeUnitToScale:
                        raise RuntimeError('uknown timeunit option {0}'.format(timeunit))
            
                    if volt_lims is None:
                        volt_lims = self.get_default_volt_lims()
            
                    if curr_lims is None:
                        curr_lims = self.get_default_curr_lims()
            
                    curr_unit = self.get_curr_unit() 
            
                    if param is not None:
                        self.set_param(testname,param)
            
                    if filename is not None:
                        fid = open(filename,'w')
            
                    t_done = self.get_test_done_time(testname, timeunit=timeunit)
            
                    # Create figure and live plot
                    fig = plt.figure()
                    plt.ion()
            
                    ax = plt.subplot(111)
                    line, = plt.plot([0],[0],'b')
                    plt.xlim(*volt_lims)
                    plt.ylim(*curr_lims)
                    plt.grid('on')
                    plt.xlabel('(V)')
                    plt.ylabel('({0})'.format(curr_unit))
                    plt.title('Potentiostat Live Plot')
            
                    fig.canvas.flush_events()
                    plt.pause(0.001)
            
                    # Create lists for storing incoming data
                    tval_list = [] 
                    volt_list = [] 
                    curr_list = []
            
                    # Setup up working thread to collect data from device
                    data_worker = threading.Thread(target=self.receive_data,args=(timeunit,))
                    data_worker.daemon = True
            
                    # Send command to run the test
                    cmd_dict = {
                            ps.CommandKey: ps.RunTestCmd, 
                            ps.TestKey: testname
                            }
                    msg_dict = self.send_cmd(cmd_dict)
                    self.test_running = True
            
                    data_worker.start()
                    done = False
            
                    while not done:
            
                        # Get available data from data queue and add to list
                        while self.data_queue.qsize() > 0:
                            data = self.data_queue.get(True)
                            if not data:
                                done = True # Empty data means run is complete
                                break
            
                            # Get data items and append to lists
                            tval = data['tval']
                            volt = data['volt']
                            curr = data['curr']
                            tval_list.append(tval)
                            volt_list.append(volt)
                            curr_list.append(curr)
            
                            # Write data to file
                            if filename is not None:
                                fid.write('{0:1.3f}, {1:1.4f}, {2:1.4f}\n'.format(tval,volt,curr))
            
                            print('{0:1.3f}, {1:1.4f}, {2:1.4f}'.format(tval,volt,curr))
            
                        # Update live plot
                        line.set_xdata(volt_list)
                        line.set_ydata(curr_list)
                        fig.canvas.flush_events()
                        plt.pause(0.001)
            
                    self.test_running = False
            
                    if filename is not None:
                        fid.close()
            
                    return tval_list, volt_list, curr_list 
            

             
            Note, this version plots the potential and current during the quiet period as well as during the test. However, it should be pretty easy to modify if don't want to show the data during the quiet period. Maybe just discard the incoming data (e.g. don't append it to the lists) until the quiet period is complete.

            1 Reply Last reply Reply Quote 0
            • pguillem
              pguillem last edited by pguillem

              Another way would be to raise an async event each time a new value is obtained from the serial?. (it would be triggered "sample_rate" times per second, just as each val is received from the teensy).

              That approach could enable any component to register for the event and update itself with each new value, without making the class responisble for plotting. Just thinking out loud.

              Right around here:

                      if len(sample_dict) > 0:
                          tval = sample_dict[ps.TimeKey]*ps.TimeUnitToScale[timeunit]
                          volt = sample_dict[ps.VoltKey]
                          curr = sample_dict[ps.CurrKey]
                          self.data_queue.put({'tval': tval, 'volt': volt, 'curr': curr})
              

              I'll work some code over march and commit to the blog. I´ll also work on some bindings to store long series on DB tables instead of datafile.txt.

              Question... can the iorodeo repos be forked with GIT? cheers.

              • Pedro
              W 1 Reply Last reply Reply Quote 0
              • W
                Will Dickson @pguillem last edited by Will Dickson

                @pguillem

                I think separating the plotting from class is a nice idea. And I like the idea of using an event. You might even get away with just the data queue (and no event) ... I hacked together a simple example below

                from __future__ import print_function
                import json
                import threading
                import Queue
                import potentiostat as ps
                
                
                class PotentiostatWithQueue(ps.Potentiostat):
                
                    def __init__(self, port, timeout=10.0, debug=False):
                        super(PotentiostatWithQueue,self).__init__(port,timeout=timeout,debug=debug)
                        self.data_queue = Queue.Queue()
                
                    def receive_data(self,timeunit):
                
                        while self.test_running:
                            # Get json data from the device
                            sample_json = self.readline()
                            sample_json = sample_json.strip()
                            try:
                                sample_dict = json.loads(sample_json.decode())
                            except ValueError:
                                continue
                            # Put new values in data queue
                            if len(sample_dict) > 0:
                                tval = sample_dict[ps.TimeKey]*ps.TimeUnitToScale[timeunit]
                                volt = sample_dict[ps.VoltKey]
                                curr = sample_dict[ps.CurrKey]
                                self.data_queue.put({'tval': tval, 'volt': volt, 'curr': curr})
                            else:
                                self.data_queue.put({})
                                self.test_running = False
                
                    def run_test(self, testname, param=None, timeunit='s'):
                
                        if timeunit not in ps.TimeUnitToScale:
                            raise RuntimeError('uknown timeunit option {0}'.format(timeunit))
                        if param is not None:
                            self.set_param(testname,param)
                
                        # Setup up working thread to collect data from device
                        data_worker = threading.Thread(target=self.receive_data,args=(timeunit,))
                        data_worker.daemon = True
                        self.data_queue = Queue.Queue()
                
                        # Send command to run the test
                        cmd_dict = {
                                ps.CommandKey: ps.RunTestCmd, 
                                ps.TestKey: testname
                                }
                        msg_dict = self.send_cmd(cmd_dict)
                        self.test_running = True
                        data_worker.start()
                

                 
                Then some other part of your program could pull data from the queue as it comes it and you could plot it as you want. Here is a quick and dirty example. I'm assuming the the above snippet is saved as potentiostat_with_queue.py.

                from __future__ import print_function
                import Queue
                from potentiostat_with_queue import PotentiostatWithQueue
                import matplotlib.pyplot as plt
                
                volt_lims = (-2.1,2.0)
                curr_lims = (-50,50)
                
                dev = PotentiostatWithQueue(port='/dev/ttyACM0')
                
                dev.set_curr_range('100uA')
                dev.set_sample_period(10)
                
                name = 'cyclic'
                param = {
                        'quietValue' : 0.0,
                        'quietTime'  : 1000,
                        'amplitude'  : 2.0,
                        'offset'     : 0.0,
                        'period'     : 2000,
                        'numCycles'  : 2,
                        'shift'      : 0.0,
                        }
                
                dev.set_param(name,param)
                t_done = dev.get_test_done_time(name)
                
                # Create figure and live plot
                fig = plt.figure()
                plt.ion()
                
                ax = plt.subplot(111)
                line, = plt.plot([0],[0],'b')
                plt.xlim(*volt_lims)
                plt.ylim(*curr_lims)
                plt.grid('on')
                plt.xlabel('(V)')
                plt.ylabel('(uA)')
                plt.title('Potentiostat Live Plot')
                
                fig.canvas.flush_events()
                plt.pause(0.001)
                
                # Lists for incoming data
                tval_list = [] 
                volt_list = [] 
                curr_list = []
                
                # Start test
                done = False
                dev.run_test(name)
                
                while not done:
                
                    # Get data from queue and add store for plotting
                    have_new_data = False
                    while True:
                
                        try:
                            data = dev.data_queue.get(False)
                        except Queue.Empty:
                            break
                
                        if data:
                            tval = data['tval']
                            volt = data['volt']
                            curr = data['curr']
                            print('{0:1.3f}, {1:1.4f}, {2:1.4f}'.format(tval,volt,curr))
                            tval_list.append(tval)
                            volt_list.append(volt)
                            curr_list.append(curr)
                            have_new_data = True
                        else:
                            done = True
                            break
                
                    # Update live plot
                    if have_new_data:
                        line.set_xdata(volt_list)
                        line.set_ydata(curr_list)
                        fig.canvas.flush_events()
                        plt.pause(0.001)
                
                raw_input('done')
                

                 
                In a GUI you might use something a bit different than the while loop shown in the simple example above - maybe a timer callback e.g., something which sets the rate at which you want to update your plot. I'm guessing you probably won't always want to update the plot as frequently as data comes in from the device - it would depend on the sample frequency (or period). The sample rate can be anything from very slow up to 1000Hz which might be a bit fast for updating the plot in the GUI (or at least there is not reason to update the plot that fast). So the idea would be to empty the data queue (which could have multiple samples in it) and then update plot in each timer callback.

                The repository on bitbucket uses mercurial - so you can't directly fork with git. You might be able to use something like git-remote-hg https://github.com/felipec/git-remote-hg

                In the past I've had OK luck with hg-git for mercurial/git for keeping hg and git repos in sync. http://hg-git.github.io/ It can be a little tricky.

                You could also use this https://help.github.com/articles/about-github-importer/ However, this is probably only one way trip e.g no going back to hg. So maybe not the best for sharing.

                Of course you could also just use mercurial (hg). If you know git then it would probably only take a few minutes to orient yourself with mercurial https://github.com/sympy/sympy/wiki/Git-hg-rosetta-stone

                R 1 Reply Last reply Reply Quote 0
                • S
                  Steffen last edited by

                  @pguillem
                  Good point. I believe it will also be a more logical structure when it comes to GUI development.

                  @Will-Dickson
                  Thanks a lot Will. Both scripts works well.

                  1 Reply Last reply Reply Quote 0
                  • R
                    Rodeo_First @Will Dickson last edited by

                    @Will-Dickson

                    Nice code. Do you think this would work for chronoamperometry if I change the parameters in the second script?

                    W 1 Reply Last reply Reply Quote 0
                    • W
                      Will Dickson @Rodeo_First last edited by

                      @Rodeo_First

                      Yes. However for chronoamperometry you will probably want to change the plot so that it displays current vs time rather than current vs voltage as the voltages are just stepped in chronoamperometry. Even better you could have two real-time plots - one showing the voltage vs time and the other showing the current vs time. This way you could see when the voltages are stepped.

                      R 1 Reply Last reply Reply Quote 0
                      • R
                        Rodeo_First @Will Dickson last edited by

                        @Will-Dickson

                        Thanks again, it was really helpful.

                        1 Reply Last reply Reply Quote 0
                        • R
                          Rodeo_First last edited by Rodeo_First

                          This post is deleted!
                          1 Reply Last reply Reply Quote 0
                          • R
                            Rodeo_First last edited by Rodeo_First

                            I've been using this code for a while and it works great.

                            I wonder if there is a way to close the serial port so there is no need to unplug and plug again the usb cable between experiments. I tried with serial.Serial("COM3").close() but it doesn't work.

                            Also, some suggestion about how can the plot only show the current subplot and not both? (voltage and current ).

                            I would appreciate any help with this.

                            EDIT: I solved it restarting the kernel at the end of the code with os.exit(0)

                            1 Reply Last reply Reply Quote 0
                            • First post
                              Last post