|
#!/usr/bin/env python3
|
|
|
|
# Simulation of discrete-time real-time dynamic control systems
|
|
# with zero-order hold (zoh) discrete time control.
|
|
# It is based on concurrent tasks defined by async coroutines
|
|
# Python 3.7 or higher is necessary (an alternative for Python 3.5
|
|
# is available upon request, however working with most recent stable
|
|
# versions is recommended).
|
|
#
|
|
# Dependencies:
|
|
# - asyncio (standard library)
|
|
# - math (standard library)
|
|
# - matplotlib (only needed for plotting)
|
|
# - numpy (only for data buffering and plotting)
|
|
# - prompt_toolkit (for keyboard interaction, essential)
|
|
# - pygments (for syntax coloring with prompt_toolkit)
|
|
#
|
|
# run with:
|
|
# ./demo_discrete_time_zoh_control.py # linux/mac system terminal
|
|
# demo_discrete_time_zoh_control.py # windows cmd or similar terminal
|
|
# python3 demo_discrete_time_zoh_control.py
|
|
#
|
|
# When running from spyder take following points into account:
|
|
# - Configure IPython to not inline graphics, under:
|
|
# Tools -> Preferences -> IPython console -> Graphics -> Graphics backend
|
|
# - Configure to run in external system terminal to prevent conflicts with
|
|
# the asyncio event loop that spyder is already using:
|
|
# Tools -> Preferences -> Run -> tick Execute in an external system terminal
|
|
#
|
|
# Rufus Fraanje, p.r.fraanje@hhs.nl
|
|
# Date: 13/11/2020
|
|
# License: GPLv3
|
|
|
|
#
|
|
# nest_asyncio may help to run in spyder, but for me only running
|
|
# in external system terminal works
|
|
# import nest_asyncio
|
|
# nest_asyncio.apply()
|
|
|
|
import asyncio
|
|
|
|
from math import pi, cos, sin
|
|
|
|
from time import time_ns
|
|
|
|
import matplotlib.pyplot as plt
|
|
|
|
import numpy as np
|
|
|
|
from prompt_toolkit import ANSI, PromptSession
|
|
from prompt_toolkit.auto_suggest import AutoSuggestFromHistory
|
|
from prompt_toolkit.completion import WordCompleter
|
|
from prompt_toolkit.lexers import PygmentsLexer
|
|
from prompt_toolkit.patch_stdout import patch_stdout
|
|
|
|
from pygments.lexers.python import Python3Lexer
|
|
|
|
# process simulation task (more precisely: coroutine, same for 'tasks' below)
|
|
async def process(shared_dict,dt=0.01):
|
|
"""Simulation of the process, each iterations takes dt seconds.
|
|
shared_dict is a dictionary that is shared by the various tasks.
|
|
The process runs while shared_dict['run'] is True, and then it
|
|
reads the value of the current control from shared_dict['u']
|
|
and calculates the corresponding output and writes that in
|
|
shared_dict['y']."""
|
|
theta = 1 #pi-0.1
|
|
omega = 0
|
|
time_now_ns = time_ns()
|
|
while shared_dict['run']:
|
|
u = shared_dict['u']
|
|
y = theta
|
|
# pendulum
|
|
#dx1_dt = x2
|
|
#dx2_dt = -sin(x1) + u * cos(x1)
|
|
# double integrator: pure mass (linear / rotational)
|
|
domega_dt = u
|
|
# note: the following lines are crude extrapolation!
|
|
theta += dt * omega
|
|
omega += dt * domega_dt
|
|
shared_dict['y'] = y
|
|
time_prev_ns, time_now_ns = time_now_ns, time_ns() # times in ns
|
|
shared_dict['dt'] = (time_now_ns - time_prev_ns)/1e6 # delta time in ms
|
|
await asyncio.sleep(dt) # wait some time, while other tasks can be done
|
|
|
|
|
|
# controller task
|
|
async def control(shared_dict,dt=0.01):
|
|
"""Simulation of the control task, each iterations takes dt seconds.
|
|
shared_dict is a dictionary that is shared by the various tasks.
|
|
The process runs while shared_dict['run'] is True, and when
|
|
shared_dict['control'] is false it writes 0 as control value
|
|
else then it calculates the control value from the value shared_dict['y'].
|
|
The control value u is finally written in shared_dict['u']."""
|
|
# while shared_dict['run']:
|
|
# shared_dict['u'] = 0
|
|
# await asyncio.sleep(dt)
|
|
Kp, Kd = 10, 1
|
|
e = 0.
|
|
while shared_dict['run']:
|
|
if shared_dict['control']:
|
|
y = shared_dict['y']
|
|
r = shared_dict['r']
|
|
e_prev, e = e, r - y
|
|
de_dt = (e - e_prev) / dt
|
|
u = Kp * e + Kd * de_dt
|
|
else:
|
|
u = 0.
|
|
shared_dict['u'] = u
|
|
await asyncio.sleep(dt)
|
|
|
|
|
|
# command line repl (read, eval, print loop) task for user interaction
|
|
async def repl(shared_dict):
|
|
"""REPL task that provides a prompt to enter python commands, and provides
|
|
access to shared_dict for inspection but also for adjusting values. It evaluates
|
|
or executes the code and shows the result if any. It both deals with expressions
|
|
(code that gives a result), such as:
|
|
shared_dict['u'] # which shows the value of the current control
|
|
and statements, such as:
|
|
shared_dict['print'] = True # this will start the printer task (see below).
|
|
Note, that while at the prompt waiting and writing input the other tasks are
|
|
being executed, thus providing concurrent behavior.
|
|
"""
|
|
session = PromptSession(multiline=False)
|
|
# Configure the completer with specifying some words to be completed
|
|
# by the prompt in the repl task (see below).
|
|
my_completer = WordCompleter(['shared_dict', 'stop', 'False', 'True',
|
|
'print', 'run', 'control', 'plot',
|
|
'u', 'y', 'plot_buffer'])
|
|
print('Enter your single line command and press return, enter \"stop\" to exit.')
|
|
while shared_dict['run']:
|
|
with patch_stdout(): # to prevent screen clutter at the prompt line
|
|
res = await session.prompt_async(ANSI('\x1b[01;34m-> '),
|
|
lexer=PygmentsLexer(Python3Lexer),
|
|
completer=my_completer,
|
|
auto_suggest=AutoSuggestFromHistory())
|
|
if res == 'stop': # quick way to stop
|
|
shared_dict['run'] = False
|
|
else:
|
|
try: # first try to evaluate expression
|
|
result = eval(res)
|
|
if result is not None: # print result when not trivial
|
|
print(result)
|
|
except SyntaxError:
|
|
try: # when not an expression, try to execute statement
|
|
exec(res)
|
|
except Exception as e: # else print exceptions
|
|
print(e)
|
|
except Exception as e: # idem
|
|
print(e)
|
|
|
|
|
|
# status printer task
|
|
async def printer(shared_dict,dt=1):
|
|
"""printer task to shows some values in the shared_dict at a rate of dt seconds."""
|
|
while shared_dict['run']:
|
|
if shared_dict['print']:
|
|
print(f"run = {shared_dict['run']}, control = {shared_dict['control']}, y = {shared_dict['y']}, u = {shared_dict['u']}")
|
|
await asyncio.sleep(dt)
|
|
|
|
|
|
# fast cyclic fifo buffer class for storing signal data
|
|
# (writes samples 2 times, so its double length,
|
|
# sacrificing some memory to prevent time-consuming memory shifting)
|
|
class CyclicBuffer(object):
|
|
"""Implementation of a fast cyclic FIFO (first in first out) buffer."""
|
|
def __init__(self,length=1,dims_sample=(1,)):
|
|
"""Create the cyclic buffer for length (integer) samples, where dims_sample is a
|
|
tuple specifying the dimensions of each sample. So when a sample is an array
|
|
of 4 elements dims_sample=(4,), but also higher dimensional data structures such
|
|
as matrices (dims_sample has 2 elements) and tensors (dims_sample more than
|
|
2 elements) are allowed."""
|
|
self._length = length
|
|
self._dims_sample = dims_sample
|
|
#self._buffer = np.zeros([2*self._length,*self._dims_sample])
|
|
self._buffer = [[0. for y in range(self._dims_sample[0])] for x in range(2*self._length)]
|
|
self._last = self._length
|
|
|
|
def get(self):
|
|
"""Return the length samples in the numpy array buffer. """
|
|
return self._buffer[self._last-self._length:self._last]
|
|
|
|
def update(self,sample):
|
|
"""Update the cyclic buffer with the sample. Sample is a list or a numpy array
|
|
which dimensions should match with dims_sample set by the __init__ method."""
|
|
last = self._last # pointer to new place
|
|
length = self._length
|
|
# store sample at position last and last-length:
|
|
#self._buffer[last] = self._buffer[last-length] = np.array(sample)
|
|
self._buffer[last] = self._buffer[last-length] = sample
|
|
last += 1
|
|
if last>=2*length:
|
|
last = length
|
|
self._last = last
|
|
|
|
|
|
# update plot buffer task
|
|
async def update_plot_buffer(shared_dict,dt=0.01):
|
|
"""Task to update the cyclic buffer periodically with an interval of dt seconds."""
|
|
while shared_dict['run']:
|
|
sample = [shared_dict['y'], shared_dict['u'],shared_dict['dt']]
|
|
shared_dict['plot_buffer'].update(sample)
|
|
await asyncio.sleep(dt)
|
|
|
|
|
|
# plotter task
|
|
async def plotter(shared_dict,dt=0.05):
|
|
"""Task to plot the samples in the cyclic buffer periodically with an interval of dt seconds."""
|
|
plt.ioff() # do not make the plot interactive, else plot not updated correctly
|
|
# or it takes the focus from the prompt to the figure which is quite annoying.
|
|
fig,axes = plt.subplots(4,1)
|
|
plt.show(block=False)
|
|
while shared_dict['run']:
|
|
if shared_dict['plot']:
|
|
[axes[i].cla() for i in range(4)] # clear all axes
|
|
data = np.array(shared_dict['plot_buffer'].get())
|
|
axes[0].plot(data[:,0])
|
|
axes[0].set_ylabel('y')
|
|
axes[1].plot(data[:,1])
|
|
axes[1].set_ylabel('u')
|
|
# y = shared_dict['y']
|
|
# axes[2].plot([0,sin(y)],[0,-cos(y)],lw=2)
|
|
# axes[2].set_xlim([-4,4])
|
|
# axes[2].set_ylim([-1.2,1.2])
|
|
# axes[2].grid(True)
|
|
axes[3].plot(data[:,2])
|
|
fig.canvas.draw() # this and following line to
|
|
fig.canvas.flush_events() # update figure
|
|
|
|
await asyncio.sleep(dt)
|
|
plt.close(fig) # at end of task, close figure
|
|
|
|
|
|
# main function:
|
|
async def main():
|
|
"""Main task, for initialization and gathering all the tasks."""
|
|
shared_dict = {}
|
|
shared_dict['run'] = True # when set to False, will stop simulation
|
|
shared_dict['control'] = True # to control or not
|
|
shared_dict['print'] = False # to print or not
|
|
shared_dict['plot'] = True # to plot or not
|
|
shared_dict['r'] = 0. # reference value to control to
|
|
shared_dict['u'] = 0. # initial value control signal
|
|
shared_dict['dt'] = 0
|
|
buffer_size = 100 # number of samples in plot
|
|
dims_sample = (3,)
|
|
# next line, create fast FIFO cyclic buffer to store last buffer_size samples
|
|
shared_dict['plot_buffer'] = CyclicBuffer(buffer_size,dims_sample)
|
|
|
|
# tasks to perform concurrently (just comment tasks you don't need):
|
|
await asyncio.gather(
|
|
process(shared_dict),
|
|
control(shared_dict),
|
|
repl(shared_dict),
|
|
printer(shared_dict),
|
|
update_plot_buffer(shared_dict),
|
|
plotter(shared_dict),
|
|
)
|
|
|
|
|
|
# Only run the tasks when run as a script (rather than a module)
|
|
if __name__ == '__main__':
|
|
asyncio.run(main())
|