Julian
Julian

Reputation: 15

Simulation of interrupted set-up and delayed server shutdown in batch processing system

I would like to create a system with servers that need time to set up before being ready to serve. The system starts to set up whenever enough 2 customers are in the queue.

When a batch (group of 2 customers) leaves the system, we check if there are a batch which is waiting to be served. If so, we keep the server remaining active, and serve the batch immediately, then interrupt the server that is setting up. Otherwise, we keep the server in idle mode for a while (delay times) before turning it off. During that time, if any batch arrives, we interrupt the delay process and the idle server will take the batch to serve immediately.

The work without delay process has been done thanks to the answer by Michael R. Gibbs: (Simulation of servers processing batches with interrupting set-up/switch-on times) I tried to modify the code in case of adding a delay process, however, there are some problems that the server which is interrupted from setup process still experience the delay process instead of shutting down immediately as expected.

Here is my code:


import simpy
import random
# import numpy as np

class param:
    def __init__(self, x):
        #self.FILE = 'Setup_time.csv'
        self.MEAN_INTERARRIVAL = 1     # arrival rate
        self.MEAN_SERVICE_TIME = 1      # service rate
        self.MEAN_DELAY_TIME = 3        # delayoff 
        self.MEAN_SWITCH_TIME =  x      # setup rate
        self.NUM_OF_SERVER = 20          # maximum number of servers can be setup
        self.MAX_SYS_SIZE = 5000000           # maximum number of customers in the system
        self.BATCH_SIZE = 2  
        self.RANDOM_SEED = 0 
        self.DURATION_TIME = 12
        

# there is no wating so normal lists are good enough`

class Server():
    """
    Server that process batches

    Has two states: starting up, and batch processing
    """
    
    def __init__(self, id, env, processing_q, server_q, delayoff_q, param):

        self.id = id
        self.env = env
        self.processing_q = processing_q
        self.server_q = server_q

        self.start_process = self.env.process(self.start_up(param))
        # self.delay_process = self.env.process(self.delayoff(param))

        global num_servers
        # sever has started, but not active 
        num_servers +=1
        
    def delayoff(self, param):
        """
        keep server in idle mode before turning it off (delayoff)

        delayoff can be interrupted if any batch arrives, then reactive to serve a batch.
        """
        global num_servers 
        
        try:   
            print(f'{env.now} server {self.id} start delayoff, delayoff servers:{len(delayoff_q)}, setup servers:{len(server_q)}')
            yield self.env.timeout(param.MEAN_DELAY_TIME)   
            # delayoff_q.remove(self)
            print(f'{env.now} server {self.id} shutdown after delayoff, delayoff servers:{len(delayoff_q)}, setup servers:{len(server_q)}')
        except simpy.Interrupt:

            print(f'{self.env.now} server{self.id} is interupted (idle server steal batch), # batches in queue: {len(processing_q)}')

        

    def start_up(self, param):
        """
        starts up the server, then start processing batches

        start up can be interrupted, stoping the server
        """
        global num_servers,num_active_server
        
         # start up
        state_time_trans.append(self.env.now)
        num_server_state.append(len(server_q))
                
        try:  
            print(f'{self.env.now} server {self.id} start to setup, num server setting up: {len(server_q)}')
            
            
            yield self.env.timeout(1/param.MEAN_SWITCH_TIME)
            # yield self.env.timeout(np.random.exponential(1/param.MEAN_SWITCH_TIME))
            
            # server has started, need to remove from startup q
            self.server_q.remove(self)
            print(f'{self.env.now} server {self.id} active and start processing, num server setting up: {len(server_q)}')
            state_time_trans.append(env.now)
            num_server_state.append(len(server_q))
                        
            
            self.env.process(self.process(param))
                    
        except simpy.Interrupt:
            
            print(f'{env.now} server {self.id} has been interupted, num server setting up: {len(server_q)}-------------------')
            state_time_trans.append(env.now)
            num_server_state.append(len(server_q))
            
            # server is stoping, need to adjust server count
            num_servers -= 1
            

    def process(self, param):
        """
        process batches
        keeps going as long as there are batches in queue

        If starts second batch, also interupts starting up server
        """
        global num_servers, num_active_server
        # count server becoming ative only once
        num_active_server += 1
        
        #Save the time a server active
        state_server_active.append(num_active_server)
        time_active.append(env.now)
        #print(f'{self.env.now} server {self.id} active, num server active: {num_active_server}')

        while True:
            # server becomes active only once, this is counting batches processed
            # num_active_server += 1

            b = processing_q.pop(0)
            
               
            yield self.env.timeout(param.MEAN_SERVICE_TIME)
            # yield env.timeout(np.random.exponential(1/param.MEAN_SERVICE_TIME))
            
            for i in range(0, len(b)):
                b[i].serve_time = env.now
                dic_serve_time.update({b[i].name: b[i].serve_time})
            print(f'{self.env.now} server {self.id} finish to serve customer {b[1].name},  {b[0].name}')
            # Server is still running

            if len(self.processing_q) > 0:
                # more processes to do,
                # steal batch from starting up server

                #s = self.server_q.pop() # lifo
                #s.stop()

                # need to check the length, not if it exists
                if len(self.server_q) >= len(self.processing_q):
                #if len(self.server_q) > 0:
                    s = self.server_q.pop()   #fifo
                    s.stop()
                    print(f'{self.env.now} server {self.id} continue to take a new batch')
                    

            # else:
            #     print(f'{env.now} server {self.id} no more batches, shut down')
            #     break
            
            else:
                # now the server is idle
                num_active_server -= 1
                delayoff_q.append(self)
                self.env.process(self.delayoff(param))
                print(f'{env.now} server {self.id} delayoff, have {len(delayoff_q)} idle servers')
                break
        
        # now the server is shutting down
        #num_active_server -= 1
        num_servers -= 1
        
        state_server_active.append(num_active_server)
        time_active.append(env.now)
        #print(f'{self.env.now} server {self.id} active, num server active: {num_active_server}')


    def stop(self):
        """
        Interrupts server start up, stoping server
        """
        try:
            self.start_process.interrupt()
        except:
            pass

def gen_arrivals(env, batching_q, processing_q, server_q, delayoff_q, param):
    """
    Generate arring customers

    If queues are too big customer will abort

    If have enough customers, create a batch and start a server
    """
    global num_servers, num_balk, num_cumulative_customer, num_active_server 
    id = 1
    name = 1
    while True:
        yield env.timeout(param.MEAN_INTERARRIVAL)
        # yield env.timeout(np.random.exponential(1/param.MEAN_INTERARRIVAL))
        num_cumulative_customer += 1

        if name < 10000000000:
            customer = Customer(name)
            batching_q.append(customer) 
            name += 1

        q_size  = len(batching_q) + (param.BATCH_SIZE * len(processing_q)) 
        sys_size = q_size + (num_active_server * param.BATCH_SIZE) 

        #if q_size > max_q_size:
        if  sys_size > param.MAX_SYS_SIZE:    
            num_balk += 1

            batching_q.pop(-1)                
            #print(f'{env.now} customer {customer.name} arrived and aborted, sys len: {sys_size }')

        else:
            #customer = object()                        
            #batching_q.append(customer)

            print(f'{env.now} customer{customer.name} has arrived, q len: {q_size}, sys len: {sys_size}')
            customer.arrival_time = env.now
            dic_arrival_time.update({customer.name: customer.arrival_time})
            
            # check if a batch can be creatd
            while len(batching_q) >= param.BATCH_SIZE:
                batch = list()
                while len(batch) < param.BATCH_SIZE:
                    batch.append(batching_q.pop(0))

                # put batch in processing q
                processing_q.append(batch)
                
                print(f'{env.now} new batch arrived, #delay: {len(delayoff_q)}')
   
                if len(delayoff_q) == 0:   #there is no idle server, need to setup a server
                    if num_servers < param.NUM_OF_SERVER:   
                        server = Server(id, env, processing_q, server_q, delayoff_q, param)
                        id += 1
                        server_q.append(server)
                        # print(f'{env.now} setup a new server {server.id}')
                else: 
                    s_delay = delayoff_q.pop(0)
                    env.process(s_delay.delayoff(param)).interrupt()
                    env.process(s_delay.process(param))

        #Calculate balking probability
        prob_balk = num_balk/num_cumulative_customer
        #print(f'{env.now} prob_balk {prob_balk}')
        list_prob_balk.append(prob_balk)
        
        
class Customer:                        
    def __init__(self, name):
        self.name = name
        self.arrival_time = 0
        self.serve_time = 0
        self.leave_time = 0
        

# boot up sim`

for x in [1]:
    
    paramtest1 = param(x)
    random.seed(paramtest1.RANDOM_SEED)
    
    #Save waiting time
    dic_serve_time = {}
    dic_arrival_time = {}
    list_waiting_time =[]

    batching_q = list()
    processing_q = list()
    server_q = list()                   # servers that are still starting up
    delayoff_q = list()
    num_servers = 0                     # number of server in system (both starting and serving server)
    num_active_server = 0               # number of servers serving customers
    
    #Save balking customers
    num_balk = 0                        # number of balking customers        
    num_cumulative_customer = 0         # total arriving customers  
    list_prob_balk = []    #list balk prob each trial
    num_start =0
    
    #Save average start up server
    num_server_state = []
    state_time_trans = []
    pi_ij = []
    
    #Save average active server E[A]
    state_server_active = []  #number server active each time
    time_active = []   #the time a server active
    delta = []   #the time between each setting up or distance between term in time_active list
    
    # create and start the model
    env = simpy.Environment()
    env.process(gen_arrivals(env, batching_q, processing_q, server_q, delayoff_q, paramtest1))
    env.run(paramtest1.DURATION_TIME)

Upvotes: 0

Views: 50

Answers (1)

Michael
Michael

Reputation: 1914

So you have a server processing a queue of customers batches, but when the queue is empty you want the server to wait a limited amount of time for a new batch. So when the queue is empty, the sever is waiting for one of two events to finish. If a batch shows up (a get event) before the limited amount of time (timeout event) processing continue, but if the timeout happens first the server shuts down, but the pending get needs to be cancelled.

When a process needs to wait on more then one event simpy has a any_of and all_of fuctions.

So the only real changes was I changed your process_q from a list to a resource store so servers can do a get event and wait in a idle state, and I dropped the delay process.

The process method now handles both the processing of batches and idle time.

import simpy
import random
# import numpy as np

class param:
    def __init__(self, x):
        #self.FILE = 'Setup_time.csv'
        self.MEAN_INTERARRIVAL = 1     # arrival rate
        self.MEAN_SERVICE_TIME = 1      # service rate
        self.MEAN_DELAY_TIME = 3        # delayoff 
        self.MEAN_SWITCH_TIME =  x      # setup rate
        self.NUM_OF_SERVER = 20          # maximum number of servers can be setup
        self.MAX_SYS_SIZE = 5000000           # maximum number of customers in the system
        self.BATCH_SIZE = 2  
        self.RANDOM_SEED = 0 
        self.DURATION_TIME = 12 + 10
        

# there is no wating so normal lists are good enough`

class Server():
    """
    Server that process batches

    Has two states: starting up, and batch processing
    """
    
    def __init__(self, id, env, processing_q, server_q, param):

        self.id = id
        self.env = env
        self.processing_q = processing_q
        self.server_q = server_q

        self.start_process = self.env.process(self.start_up(param))
        # self.delay_process = self.env.process(self.delayoff(param))

        global num_servers
        # sever has started, but not active 
        num_servers +=1
        
    # def delayoff(self, param):
    #     """
    #     keep server in idle mode before turning it off (delayoff)

    #     delayoff can be interrupted if any batch arrives, then reactive to serve a batch.
    #     """
    #     global num_servers 
        
    #     try:   
    #         print(f'{env.now} server {self.id} start delayoff, delayoff servers:{len(delayoff_q)}, setup servers:{len(server_q)}')
    #         yield self.env.timeout(param.MEAN_DELAY_TIME)   
    #         # delayoff_q.remove(self)
    #         print(f'{env.now} server {self.id} shutdown after delayoff, delayoff servers:{len(delayoff_q)}, setup servers:{len(server_q)}')
    #     except simpy.Interrupt:

    #         print(f'{self.env.now} server{self.id} is interupted (idle server steal batch), # batches in queue: {len(processing_q.items )}')

        

    def start_up(self, param):
        """
        starts up the server, then start processing batches

        start up can be interrupted, stoping the server
        """
        global num_servers,num_active_server
        
         # start up
        state_time_trans.append(self.env.now)
        num_server_state.append(len(server_q))
                
        try:  
            print(f'{self.env.now} server {self.id} start to setup, num server setting up: {len(server_q)}')
            
            
            yield self.env.timeout(1/param.MEAN_SWITCH_TIME)
            # yield self.env.timeout(np.random.exponential(1/param.MEAN_SWITCH_TIME))
            
            # server has started, need to remove from startup q
            self.server_q.remove(self)
            print(f'{self.env.now} server {self.id} active and start processing, num server setting up: {len(server_q)}')
            state_time_trans.append(env.now)
            num_server_state.append(len(server_q))
                        
            
            self.env.process(self.process(param))
                    
        except simpy.Interrupt:
            
            print(f'{env.now} server {self.id} has been interupted, num server setting up: {len(server_q)}-------------------')
            state_time_trans.append(env.now)
            num_server_state.append(len(server_q))
            
            # server is stoping, need to adjust server count
            num_servers -= 1
            

    def process(self, param):
        """
        process batches
        keeps going as long as there are batches in queue

        If starts second batch, also interupts starting up server
        """
        global num_servers, num_active_server
        # count server becoming ative only once
        num_active_server += 1
        
        #Save the time a server active
        state_server_active.append(num_active_server)
        time_active.append(env.now)
        #print(f'{self.env.now} server {self.id} active, num server active: {num_active_server}')

        while True:
            # server becomes active only once, this is counting batches processed
            # num_active_server += 1

            idel_flag = False
            if len(self.processing_q.items ) == 0:
                # nothing to process, will be idel
                idel_flag = True
                num_active_server -= 1
                print(f'{self.env.now} server {self.id} idel, num server active: {num_active_server} idel: {len(processing_q.get_queue) + 1}')

            idel_timeout_e = self.env.timeout(param.MEAN_DELAY_TIME)
            get_e = processing_q.get()

            fired_events = yield env.any_of([idel_timeout_e, get_e])
            
            if get_e in fired_events.keys():

                if idel_flag:
                    # was idel, now active again
                    idel_flag = False
                    num_active_server += 1
                    print(f'{self.env.now} server {self.id} active again, num server active: {num_active_server} idel: {len(processing_q.get_queue) + 1}')

                b = get_e.value
                
                yield self.env.timeout(param.MEAN_SERVICE_TIME)
                # yield env.timeout(np.random.exponential(1/param.MEAN_SERVICE_TIME))
                
                for i in range(0, len(b)):
                    b[i].serve_time = env.now
                    dic_serve_time.update({b[i].name: b[i].serve_time})
                print(f'{self.env.now} server {self.id} finish to serve customer {b[1].name},  {b[0].name}')
                # Server is still running

                if len(self.processing_q.items ) > 0:
                    # more processes to do,
                    # steal batch from starting up server

                    #s = self.server_q.pop() # lifo
                    #s.stop()

                    # need to check the length, not if it exists
                    if len(self.server_q) >= len(self.processing_q.items ):
                    #if len(self.server_q) > 0:
                        s = self.server_q.pop()   #fifo
                        s.stop()
                        print(f'{self.env.now} server {self.id} continue to take a new batch')
                        

            # else:
            #     print(f'{env.now} server {self.id} no more batches, shut down')
            #     break
            
            else:

                # idel timeout has occured
                # cancel the pending get
                get_e.cancel()

                print(f'{env.now} server {self.id} Shutting down because of idel timeout')
                break
        
        # now the server is shutting down
        #num_active_server -= 1
        num_servers -= 1
        
        state_server_active.append(num_active_server)
        time_active.append(env.now)
        print(f'{self.env.now} server {self.id} stopped, num server active: {num_active_server}, idel: {len(processing_q.get_queue)}')


    def stop(self):
        """
        Interrupts server start up, stoping server
        """
        try:
            self.start_process.interrupt()
        except:
            pass

def gen_arrivals(env, batching_q, processing_q, server_q, param):
    """
    Generate arring customers

    If queues are too big customer will abort

    If have enough customers, create a batch and start a server
    """
    global num_servers, num_balk, num_cumulative_customer, num_active_server 
    id = 1
    name = 1
    while True:
        yield env.timeout(param.MEAN_INTERARRIVAL)
        # yield env.timeout(np.random.exponential(1/param.MEAN_INTERARRIVAL))
        num_cumulative_customer += 1

        if name < 10:
            customer = Customer(name)
            batching_q.append(customer) 
            name += 1

        q_size  = len(batching_q) + (param.BATCH_SIZE * len(processing_q.items )) 
        sys_size = q_size + (num_active_server * param.BATCH_SIZE) 

        #if q_size > max_q_size:
        if  sys_size > param.MAX_SYS_SIZE:    
            num_balk += 1

            batching_q.pop(-1)                
            #print(f'{env.now} customer {customer.name} arrived and aborted, sys len: {sys_size }')

        else:
            #customer = object()                        
            #batching_q.append(customer)

            print(f'{env.now} customer{customer.name} has arrived, q len: {q_size}, sys len: {sys_size}')
            customer.arrival_time = env.now
            dic_arrival_time.update({customer.name: customer.arrival_time})
            
            # check if a batch can be creatd
            while len(batching_q) >= param.BATCH_SIZE:
                batch = list()
                while len(batch) < param.BATCH_SIZE:
                    batch.append(batching_q.pop(0))

                # put batch in processing q
                processing_q.put(batch)
                
                print(f'{env.now} new batch arrived, #idel: {len(processing_q.get_queue)}')
   
                if len(processing_q.get_queue) == 0:   #there is no idle server, need to setup a server
                    if num_servers < param.NUM_OF_SERVER:   
                        server = Server(id, env, processing_q, server_q, param)
                        id += 1
                        server_q.append(server)
                        # print(f'{env.now} setup a new server {server.id}')
                # else: 
                #     s_delay = delayoff_q.pop(0)
                #     env.process(s_delay.delayoff(param)).interrupt()
                #     env.process(s_delay.process(param))

                # the idel server already has a get request on the processing_q store

        #Calculate balking probability
        prob_balk = num_balk/num_cumulative_customer
        #print(f'{env.now} prob_balk {prob_balk}')
        list_prob_balk.append(prob_balk)
        
        
class Customer:                        
    def __init__(self, name):
        self.name = name
        self.arrival_time = 0
        self.serve_time = 0
        self.leave_time = 0
        

# boot up sim`

for x in [1]:
    env = simpy.Environment()

    paramtest1 = param(x)
    random.seed(paramtest1.RANDOM_SEED)
    
    #Save waiting time
    dic_serve_time = {}
    dic_arrival_time = {}
    list_waiting_time =[]

    batching_q = list()
    processing_q = simpy.Store(env)    # queued servers are idel
    server_q = list()                   # servers that are still starting up
    #delayoff_q = list()
    num_servers = 0                     # number of server in system (both starting and serving server)
    num_active_server = 0               # number of servers serving customers
    
    #Save balking customers
    num_balk = 0                        # number of balking customers        
    num_cumulative_customer = 0         # total arriving customers  
    list_prob_balk = []    #list balk prob each trial
    num_start =0
    
    #Save average start up server
    num_server_state = []
    state_time_trans = []
    pi_ij = []
    
    #Save average active server E[A]
    state_server_active = []  #number server active each time
    time_active = []   #the time a server active
    delta = []   #the time between each setting up or distance between term in time_active list
    
    # create and start the model
    
    env.process(gen_arrivals(env, batching_q, processing_q, server_q, paramtest1))
    env.run(paramtest1.DURATION_TIME)

Upvotes: 0

Related Questions