Source code for rms

from common import check_rms_edf, sched_list_2_sched_dict
import numpy as np
import sys


[docs]def rms_is_schedulable(tasks): """Check the task set schedulability for RMS. Check whether the specified task set is schedulable under RMS algorithm. :param tasks: list of task descriptors. :return: The return value. True for success, False otherwise. :rtype: bool. """ totalUse = sum(task['exec_time']/float(task['period']) for task in tasks) n = len(tasks) if(n == 0 ): return #check scallability based off of total use and number os tasks print ("RMS schedudability:",totalUse, " <= ", n*(2**(1/n)-1)) if (totalUse > 1.0): print("ERROR: total CPU usage > 100%.") return False if (totalUse <= n*(2**(1/n)-1)): print("The tasks are provably schedulable.") else: print("The tasks might be scheludable, with no guarantees.") return True
[docs]def rms(task_list, sim_time=0, verbose=False): """Simulates the Rate Monotonic (RM) scheduling algorithm. :param task_list: List of task descriptors. :param sim_time: Time for simulation. If none is defined, then LCM (Lowest Common Multiple) of periods is used. :type sim_time: int :param verbose: :type verbose: bool :return: sched :rtype: schedule list for each task (List of dictionaries). """ # check the input syntax if not check_rms_edf(task_list): print("Aborting execution of RMS algorithm due to invalid input file.") sys.exit(1) # check schedulability of the task set if not rms_is_schedulable(task_list): print("Aborting execution of RMS algorithm since this task set is not schedulable for RMS.") sys.exit(1) # if the simulation time is not specified by the user, then use the LCM of the task periods if sim_time == 0: # Returns the LCM (Lowest Common Multiple) of a list of integers # this will determine the max simulation time list_period=[] for task in task_list: list_period.append(task['period']) sim_time = np.lcm.reduce(list_period) print ("The simulation time is:", sim_time) # assuming all the tasks start at time zero, initialize the OS's ready_list ready_list = [] for task in task_list: ready_list.append([task['exec_time'], task]) #print (ready_list) schedule = [] for i in range(1,sim_time+1): # check if there are tasks to be included in the ready_list for task in task_list: # check if it is time to start another job if ((i % task['period']) == 0): ready_list.append([task['exec_time'], task]) # shortest period first ready_list.sort(key=lambda x: x[1]['period'], reverse=False) if len(ready_list) ==0: schedule.append('idle') # skip this OS tick continue # top task gain access to the cpu schedule.append(ready_list[0][1]['name']) # decrement computation time of the top job ready_list[0][0] -= 1 # check if there are jobs to be included in the ready_list for job in ready_list: # check if the job finished, then delete the top of the list if job[0] == 0: del ready_list[0] if verbose: print (schedule) # artificially including a new task called idle to track the CPU idle time task_list.append( dict( name= 'idle', exec_time= 1, deadline= 1, period= 1, ) ) sched = sched_list_2_sched_dict(task_list,schedule,verbose) return sched