Source code for common

import sys
from math import gcd
import datetime
# for plotting
import plotly.express as px
import pandas as pd
import numpy as np


[docs]def versiontuple(v): """Convert a string of package version in a tuple for future comparison. :param v: string version, e.g "2.3.1". :type v: str :return: The return tuple, e.g. (2,3,1). :rtype: tuple :Example: >>> versiontuple("2.3.1") > versiontuple("10.1.1") >>> False """ return tuple(map(int, (v.split("."))))
[docs]def check_rms_edf(task_list): """Parse the YAML for the required field for RMS and EDF algorithms. .. literalinclude:: ../../wikipedia.yaml :language: yaml :linenos: :param task_list: List of task descriptors, as in the example above. :type task_list: List of dictionaries. :return: True for success, False otherwise. :rtype: bool """ ############## # validate the input format first. some fields are expected for rms ############## # must have at least 2 tasks if len(task_list) <= 1: print ("ERROR: the task list must have more than 1 task. Found", len(task_list)) return False # check if all tasks have the mandatory fields print ('checking the task list ... ', end='') for task in task_list: if 'name' not in task: print ("\nERROR: field 'name' not found in task") return False if 'exec_time' not in task: print ("\nERROR: field 'exec_time' not found in task") return False if 'period' not in task: print ("\nERROR: field 'period' not found in task") return False if 'deadline' not in task: print ("\nERROR: field 'deadline' not found in task") return False # each task must have a name (str), exec_time (N), deadline (N), period (N) for task in task_list: if type(task['name']) is not str: print ("\nERROR: string expected in the 'name' field. Got",type(task['name'])) return False if type(task['exec_time']) is not int: print ("\nERROR: int expected in the 'exec_time' field. Got",type(task['exec_time'])) return False if task['exec_time'] <= 0: print ("\nERROR: 'exec_time' field must be a positive integer. Got",task['exec_time']) return False if type(task['period']) is not int: print ("\nERROR: int expected in the 'period' field. Got",type(task['period'])) return False if task['period'] <= 0: print ("\nERROR: 'period' field must be a positive integer. Got",task['period']) return False if type(task['deadline']) is not int: print ("\nERROR: int expected in the 'deadline' field. Got",type(task['deadline'])) return False if task['deadline'] <= 0: print ("\nERROR: 'deadline' field must be a positive integer. Got",task['deadline']) return False print ('passed !') return True
[docs]def check_sched(sched): """Parse the YAML for the resulting schedule of a scheduling algorithm. .. literalinclude:: ../../wikipedia-sched.yaml :language: yaml :linenos: :param task_list: List of sched descriptors, as in the example above. :type task_list: List of dictionaries. :return: True for success, False otherwise. :rtype: bool """ ############## # validate the input format first. some fields are expected for rms ############## # must have at least 1 task if len(sched['sched']) < 1: print ("ERROR: the sched list must have at least 1 task. Found", len(sched['sched'])) return False # check if all tasks have the mandatory fields print ('checking the scheduling list ... ', end='') for task in sched['sched']: if 'name' not in task: print ("\nERROR: field 'name' not found in task") return False if 'jobs' not in task: print ("\nERROR: field 'jobs' not found in task") return False if len(task['jobs']) <= 0: print ("\nWARNING: task %s has no job. Got" % task['name'], len(task['jobs'])) #return False for job in task['jobs']: if type(job[0]) is not int or type(job[1]) is not int: print ("\nERROR: jobs must be int initial and final times. Got", type(job[0]), type(job[1])) return False if job[0] > job[1]: print ("\nERROR: the initial job time must be lower than the the final time. Got", job[0], job[1]) return False # zero is not supported in the plotting function if job[0] < 0: print ("\nERROR: the initial job time must be greater than 0. Got", job[0]) return False if job[1] < 0: print ("\nERROR: the initial job time must be greater than 0. Got", job[1]) return False print ('passed !') return True
[docs]def convert_to_datetime(x): """Converts a natural number to date. It converts a natural number to date, where 0 corresponds to datetime(1970, 1, 1) (assuming, y/m/d). Value 1 corresponds to datetime(1970, 1, 2). :param x: natural value :type x: int :rtype: datetime """ #data_conv = datetime.datetime.fromtimestamp(31536000+x*24*3600).strftime("%Y-%m-%d") data_conv2 = (datetime.datetime(1970, 1, 1, tzinfo=datetime.timezone.utc) + datetime.timedelta(days=x)).strftime("%Y-%m-%d") return data_conv2
[docs]def plot_gantt(sched, verbose = False): """Use the plotly lib to plot the gantt chart. .. literalinclude:: ../../wikipedia-sched.yaml :language: yaml :linenos: :param sched: The shedule YAML file, as in the example above. :type sched: List of dictionaries. :param verbose: enable/disable verbose mode :type verbose: bool :return: None .. todo:: add a slider https://plotly.com/python/animations/ https://plotly.com/python/sliders/ .. todo:: provide an alternative plotting option with matplotlib https://matplotlib.org/3.1.0/gallery/lines_bars_and_markers/broken_barh.html .. todo:: other alternative plots https://github.com/ehsan-elwan/RM-Task-Scheduling/blob/master/Plotter.py https://github.com/johnharakas/scheduling-des/blob/sim-plotting/Qt_Canvas.py https://github.com/esalehi1996/Realtime_Scheduling_python/blob/master/main.py https://github.com/carlosgeos/uniprocessor-scheduler/blob/master/src/simulation.py https://github.com/ksameersrk/rt-scheduler/blob/master/analysis/plot_graph.py https://github.com/guilyx/gantt-trampoline/blob/master/lib/GanttPlot.py .. todo:: export figure with the plot https://plotly.com/python/static-image-export/ """ # check plotly version import plotly as pl if versiontuple(pl.__version__) < versiontuple("4.9.0"): print ("ERROR: the scheduling plotting function requires plotly 4.9.0 or newer. Found", pl.__version__) return False # check the input argument if not check_sched(sched): print("Aborting execution of scheduling plotting due to invalid input file.") sys.exit(1) # get the max value of x of the schedule to be used in the plot max_x = 0 # create the data format required by pandas DataFrame list_tasks = [] for task in sched['sched']: # color is not mandatory for a task if 'color' in task: task_color = task['color'] else: task_color = 'blue' # the default color if len(task['jobs']) == 0: # place the task in the char even if it had no job executed list_tasks.append(dict(Task=task['name'], Start=convert_to_datetime(0), Finish=convert_to_datetime(0), Color = task_color, # used only by the hover feature Start_tick = 0, Finish_tick = 0, Duration = 0 )) else: for job in task['jobs']: list_tasks.append(dict(Task=task['name'], Start=convert_to_datetime(job[0]), Finish=convert_to_datetime(job[1]), Color = task_color, # used only by the hover feature Start_tick = job[0], Finish_tick = job[1], Duration = job[1]-job[0] )) max_x = max(max_x,max(job[0],job[1])) # creating the pandas DataFrame requred by plotly df = pd.DataFrame(list_tasks) # title is optional if 'title' in sched: chart_title = sched['title'] else: chart_title = '' fig = px.timeline(df, title = chart_title, x_start="Start", x_end="Finish", color = "Color", y="Task", # info used only for the hover feature custom_data = ['Start_tick','Finish_tick','Duration']) fig.update_yaxes(autorange="reversed") # otherwise tasks are listed from the bottom up # format the data appearing in the mouse hover feature fig.update_traces( hovertemplate = "Start:%{customdata[0]}<br>Start: %{customdata[1]}<br>Duration: %{customdata[2]}" ) # this part converts dates into ticks num_tick_labels = np.linspace(start = 0, stop = max_x, num = max_x+1, dtype = int) date_ticks = [convert_to_datetime(int(x)) for x in num_tick_labels] fig.layout.xaxis.update({ 'tickvals' : date_ticks, 'ticktext' : num_tick_labels }) # it will show in the browser fig.show()
[docs]def sched_list_2_sched_dict(tasks,sched_list,verbose=False): """Schedule format conversion. Convert a scheduling in format of a list into a schedule in the format of list of dictionary. .. literalinclude:: ../../wikipedia.yaml :language: yaml :linenos: :param tasks: List of tasks descriptors, as in the example above. :param sched_list: List the execution order of the jobs, e.g. ["P1","P1","P1","idle","P3","P2","P3", ...]. :param verbose: Enable/disable the verbosity level. :type tasks: List of dictionaries. :type sched_list: List of str. :type verbose: bool. .. literalinclude:: ../../wikipedia-sched.yaml :language: yaml :linenos: :return: List of schedule descriptors, as in the example above. :rtype: List of dictionaries. """ # creating the data structrute for scheduling sched = {} sched['title'] = 'Some title' sched['sched'] = [] for task in tasks: sched_task = {} sched_task['name'] = task['name'] sched_task['jobs'] = [] if task['name'] == 'idle': sched_task['color'] = 'green' else: sched_task['color'] = 'blue' if verbose: print ("#############", task['name'], "#############") print (sched_task) #search for this task in the schedule idx = 0 while idx < len(sched_list): if task['name'] == sched_list[idx]: start_time = idx if idx != len(sched_list): for idx2, ready_task2 in enumerate(sched_list[idx:]): end_time = idx+idx2 if task['name'] != ready_task2: #skip the start time to the next period idx = end_time break else: end_time = idx #print (task['name'], "[",start_time,end_time,"]") sched_task['jobs'].append([start_time,end_time]) idx +=1 sched['sched'].append(sched_task) if verbose: print (sched_task) print ("##########################") return sched