Reputation: 15
I would like to create a system with servers that need time to set up before being ready to serve. The system starts to set up whenever enough 2 customers are in the queue.
When a batch (group of 2 customers) leaves the system, we check if there are a batch which is waiting to be served. If so, we keep the server remaining active, and serve the batch immediately, then interrupt the server that is setting up. Otherwise, we keep the server in idle mode for a while (delay times) before turning it off. During that time, if any batch arrives, we interrupt the delay process and the idle server will take the batch to serve immediately.
The work without delay process has been done thanks to the answer by Michael R. Gibbs: (Simulation of servers processing batches with interrupting set-up/switch-on times) I tried to modify the code in case of adding a delay process, however, there are some problems that the server which is interrupted from setup process still experience the delay process instead of shutting down immediately as expected.
Here is my code:
import simpy
import random
# import numpy as np
class param:
def __init__(self, x):
#self.FILE = 'Setup_time.csv'
self.MEAN_INTERARRIVAL = 1 # arrival rate
self.MEAN_SERVICE_TIME = 1 # service rate
self.MEAN_DELAY_TIME = 3 # delayoff
self.MEAN_SWITCH_TIME = x # setup rate
self.NUM_OF_SERVER = 20 # maximum number of servers can be setup
self.MAX_SYS_SIZE = 5000000 # maximum number of customers in the system
self.BATCH_SIZE = 2
self.RANDOM_SEED = 0
self.DURATION_TIME = 12
# there is no wating so normal lists are good enough`
class Server():
"""
Server that process batches
Has two states: starting up, and batch processing
"""
def __init__(self, id, env, processing_q, server_q, delayoff_q, param):
self.id = id
self.env = env
self.processing_q = processing_q
self.server_q = server_q
self.start_process = self.env.process(self.start_up(param))
# self.delay_process = self.env.process(self.delayoff(param))
global num_servers
# sever has started, but not active
num_servers +=1
def delayoff(self, param):
"""
keep server in idle mode before turning it off (delayoff)
delayoff can be interrupted if any batch arrives, then reactive to serve a batch.
"""
global num_servers
try:
print(f'{env.now} server {self.id} start delayoff, delayoff servers:{len(delayoff_q)}, setup servers:{len(server_q)}')
yield self.env.timeout(param.MEAN_DELAY_TIME)
# delayoff_q.remove(self)
print(f'{env.now} server {self.id} shutdown after delayoff, delayoff servers:{len(delayoff_q)}, setup servers:{len(server_q)}')
except simpy.Interrupt:
print(f'{self.env.now} server{self.id} is interupted (idle server steal batch), # batches in queue: {len(processing_q)}')
def start_up(self, param):
"""
starts up the server, then start processing batches
start up can be interrupted, stoping the server
"""
global num_servers,num_active_server
# start up
state_time_trans.append(self.env.now)
num_server_state.append(len(server_q))
try:
print(f'{self.env.now} server {self.id} start to setup, num server setting up: {len(server_q)}')
yield self.env.timeout(1/param.MEAN_SWITCH_TIME)
# yield self.env.timeout(np.random.exponential(1/param.MEAN_SWITCH_TIME))
# server has started, need to remove from startup q
self.server_q.remove(self)
print(f'{self.env.now} server {self.id} active and start processing, num server setting up: {len(server_q)}')
state_time_trans.append(env.now)
num_server_state.append(len(server_q))
self.env.process(self.process(param))
except simpy.Interrupt:
print(f'{env.now} server {self.id} has been interupted, num server setting up: {len(server_q)}-------------------')
state_time_trans.append(env.now)
num_server_state.append(len(server_q))
# server is stoping, need to adjust server count
num_servers -= 1
def process(self, param):
"""
process batches
keeps going as long as there are batches in queue
If starts second batch, also interupts starting up server
"""
global num_servers, num_active_server
# count server becoming ative only once
num_active_server += 1
#Save the time a server active
state_server_active.append(num_active_server)
time_active.append(env.now)
#print(f'{self.env.now} server {self.id} active, num server active: {num_active_server}')
while True:
# server becomes active only once, this is counting batches processed
# num_active_server += 1
b = processing_q.pop(0)
yield self.env.timeout(param.MEAN_SERVICE_TIME)
# yield env.timeout(np.random.exponential(1/param.MEAN_SERVICE_TIME))
for i in range(0, len(b)):
b[i].serve_time = env.now
dic_serve_time.update({b[i].name: b[i].serve_time})
print(f'{self.env.now} server {self.id} finish to serve customer {b[1].name}, {b[0].name}')
# Server is still running
if len(self.processing_q) > 0:
# more processes to do,
# steal batch from starting up server
#s = self.server_q.pop() # lifo
#s.stop()
# need to check the length, not if it exists
if len(self.server_q) >= len(self.processing_q):
#if len(self.server_q) > 0:
s = self.server_q.pop() #fifo
s.stop()
print(f'{self.env.now} server {self.id} continue to take a new batch')
# else:
# print(f'{env.now} server {self.id} no more batches, shut down')
# break
else:
# now the server is idle
num_active_server -= 1
delayoff_q.append(self)
self.env.process(self.delayoff(param))
print(f'{env.now} server {self.id} delayoff, have {len(delayoff_q)} idle servers')
break
# now the server is shutting down
#num_active_server -= 1
num_servers -= 1
state_server_active.append(num_active_server)
time_active.append(env.now)
#print(f'{self.env.now} server {self.id} active, num server active: {num_active_server}')
def stop(self):
"""
Interrupts server start up, stoping server
"""
try:
self.start_process.interrupt()
except:
pass
def gen_arrivals(env, batching_q, processing_q, server_q, delayoff_q, param):
"""
Generate arring customers
If queues are too big customer will abort
If have enough customers, create a batch and start a server
"""
global num_servers, num_balk, num_cumulative_customer, num_active_server
id = 1
name = 1
while True:
yield env.timeout(param.MEAN_INTERARRIVAL)
# yield env.timeout(np.random.exponential(1/param.MEAN_INTERARRIVAL))
num_cumulative_customer += 1
if name < 10000000000:
customer = Customer(name)
batching_q.append(customer)
name += 1
q_size = len(batching_q) + (param.BATCH_SIZE * len(processing_q))
sys_size = q_size + (num_active_server * param.BATCH_SIZE)
#if q_size > max_q_size:
if sys_size > param.MAX_SYS_SIZE:
num_balk += 1
batching_q.pop(-1)
#print(f'{env.now} customer {customer.name} arrived and aborted, sys len: {sys_size }')
else:
#customer = object()
#batching_q.append(customer)
print(f'{env.now} customer{customer.name} has arrived, q len: {q_size}, sys len: {sys_size}')
customer.arrival_time = env.now
dic_arrival_time.update({customer.name: customer.arrival_time})
# check if a batch can be creatd
while len(batching_q) >= param.BATCH_SIZE:
batch = list()
while len(batch) < param.BATCH_SIZE:
batch.append(batching_q.pop(0))
# put batch in processing q
processing_q.append(batch)
print(f'{env.now} new batch arrived, #delay: {len(delayoff_q)}')
if len(delayoff_q) == 0: #there is no idle server, need to setup a server
if num_servers < param.NUM_OF_SERVER:
server = Server(id, env, processing_q, server_q, delayoff_q, param)
id += 1
server_q.append(server)
# print(f'{env.now} setup a new server {server.id}')
else:
s_delay = delayoff_q.pop(0)
env.process(s_delay.delayoff(param)).interrupt()
env.process(s_delay.process(param))
#Calculate balking probability
prob_balk = num_balk/num_cumulative_customer
#print(f'{env.now} prob_balk {prob_balk}')
list_prob_balk.append(prob_balk)
class Customer:
def __init__(self, name):
self.name = name
self.arrival_time = 0
self.serve_time = 0
self.leave_time = 0
# boot up sim`
for x in [1]:
paramtest1 = param(x)
random.seed(paramtest1.RANDOM_SEED)
#Save waiting time
dic_serve_time = {}
dic_arrival_time = {}
list_waiting_time =[]
batching_q = list()
processing_q = list()
server_q = list() # servers that are still starting up
delayoff_q = list()
num_servers = 0 # number of server in system (both starting and serving server)
num_active_server = 0 # number of servers serving customers
#Save balking customers
num_balk = 0 # number of balking customers
num_cumulative_customer = 0 # total arriving customers
list_prob_balk = [] #list balk prob each trial
num_start =0
#Save average start up server
num_server_state = []
state_time_trans = []
pi_ij = []
#Save average active server E[A]
state_server_active = [] #number server active each time
time_active = [] #the time a server active
delta = [] #the time between each setting up or distance between term in time_active list
# create and start the model
env = simpy.Environment()
env.process(gen_arrivals(env, batching_q, processing_q, server_q, delayoff_q, paramtest1))
env.run(paramtest1.DURATION_TIME)
Upvotes: 0
Views: 50
Reputation: 1914
So you have a server processing a queue of customers batches, but when the queue is empty you want the server to wait a limited amount of time for a new batch. So when the queue is empty, the sever is waiting for one of two events to finish. If a batch shows up (a get event) before the limited amount of time (timeout event) processing continue, but if the timeout happens first the server shuts down, but the pending get needs to be cancelled.
When a process needs to wait on more then one event simpy has a any_of and all_of fuctions.
So the only real changes was I changed your process_q from a list to a resource store so servers can do a get event and wait in a idle state, and I dropped the delay process.
The process method now handles both the processing of batches and idle time.
import simpy
import random
# import numpy as np
class param:
def __init__(self, x):
#self.FILE = 'Setup_time.csv'
self.MEAN_INTERARRIVAL = 1 # arrival rate
self.MEAN_SERVICE_TIME = 1 # service rate
self.MEAN_DELAY_TIME = 3 # delayoff
self.MEAN_SWITCH_TIME = x # setup rate
self.NUM_OF_SERVER = 20 # maximum number of servers can be setup
self.MAX_SYS_SIZE = 5000000 # maximum number of customers in the system
self.BATCH_SIZE = 2
self.RANDOM_SEED = 0
self.DURATION_TIME = 12 + 10
# there is no wating so normal lists are good enough`
class Server():
"""
Server that process batches
Has two states: starting up, and batch processing
"""
def __init__(self, id, env, processing_q, server_q, param):
self.id = id
self.env = env
self.processing_q = processing_q
self.server_q = server_q
self.start_process = self.env.process(self.start_up(param))
# self.delay_process = self.env.process(self.delayoff(param))
global num_servers
# sever has started, but not active
num_servers +=1
# def delayoff(self, param):
# """
# keep server in idle mode before turning it off (delayoff)
# delayoff can be interrupted if any batch arrives, then reactive to serve a batch.
# """
# global num_servers
# try:
# print(f'{env.now} server {self.id} start delayoff, delayoff servers:{len(delayoff_q)}, setup servers:{len(server_q)}')
# yield self.env.timeout(param.MEAN_DELAY_TIME)
# # delayoff_q.remove(self)
# print(f'{env.now} server {self.id} shutdown after delayoff, delayoff servers:{len(delayoff_q)}, setup servers:{len(server_q)}')
# except simpy.Interrupt:
# print(f'{self.env.now} server{self.id} is interupted (idle server steal batch), # batches in queue: {len(processing_q.items )}')
def start_up(self, param):
"""
starts up the server, then start processing batches
start up can be interrupted, stoping the server
"""
global num_servers,num_active_server
# start up
state_time_trans.append(self.env.now)
num_server_state.append(len(server_q))
try:
print(f'{self.env.now} server {self.id} start to setup, num server setting up: {len(server_q)}')
yield self.env.timeout(1/param.MEAN_SWITCH_TIME)
# yield self.env.timeout(np.random.exponential(1/param.MEAN_SWITCH_TIME))
# server has started, need to remove from startup q
self.server_q.remove(self)
print(f'{self.env.now} server {self.id} active and start processing, num server setting up: {len(server_q)}')
state_time_trans.append(env.now)
num_server_state.append(len(server_q))
self.env.process(self.process(param))
except simpy.Interrupt:
print(f'{env.now} server {self.id} has been interupted, num server setting up: {len(server_q)}-------------------')
state_time_trans.append(env.now)
num_server_state.append(len(server_q))
# server is stoping, need to adjust server count
num_servers -= 1
def process(self, param):
"""
process batches
keeps going as long as there are batches in queue
If starts second batch, also interupts starting up server
"""
global num_servers, num_active_server
# count server becoming ative only once
num_active_server += 1
#Save the time a server active
state_server_active.append(num_active_server)
time_active.append(env.now)
#print(f'{self.env.now} server {self.id} active, num server active: {num_active_server}')
while True:
# server becomes active only once, this is counting batches processed
# num_active_server += 1
idel_flag = False
if len(self.processing_q.items ) == 0:
# nothing to process, will be idel
idel_flag = True
num_active_server -= 1
print(f'{self.env.now} server {self.id} idel, num server active: {num_active_server} idel: {len(processing_q.get_queue) + 1}')
idel_timeout_e = self.env.timeout(param.MEAN_DELAY_TIME)
get_e = processing_q.get()
fired_events = yield env.any_of([idel_timeout_e, get_e])
if get_e in fired_events.keys():
if idel_flag:
# was idel, now active again
idel_flag = False
num_active_server += 1
print(f'{self.env.now} server {self.id} active again, num server active: {num_active_server} idel: {len(processing_q.get_queue) + 1}')
b = get_e.value
yield self.env.timeout(param.MEAN_SERVICE_TIME)
# yield env.timeout(np.random.exponential(1/param.MEAN_SERVICE_TIME))
for i in range(0, len(b)):
b[i].serve_time = env.now
dic_serve_time.update({b[i].name: b[i].serve_time})
print(f'{self.env.now} server {self.id} finish to serve customer {b[1].name}, {b[0].name}')
# Server is still running
if len(self.processing_q.items ) > 0:
# more processes to do,
# steal batch from starting up server
#s = self.server_q.pop() # lifo
#s.stop()
# need to check the length, not if it exists
if len(self.server_q) >= len(self.processing_q.items ):
#if len(self.server_q) > 0:
s = self.server_q.pop() #fifo
s.stop()
print(f'{self.env.now} server {self.id} continue to take a new batch')
# else:
# print(f'{env.now} server {self.id} no more batches, shut down')
# break
else:
# idel timeout has occured
# cancel the pending get
get_e.cancel()
print(f'{env.now} server {self.id} Shutting down because of idel timeout')
break
# now the server is shutting down
#num_active_server -= 1
num_servers -= 1
state_server_active.append(num_active_server)
time_active.append(env.now)
print(f'{self.env.now} server {self.id} stopped, num server active: {num_active_server}, idel: {len(processing_q.get_queue)}')
def stop(self):
"""
Interrupts server start up, stoping server
"""
try:
self.start_process.interrupt()
except:
pass
def gen_arrivals(env, batching_q, processing_q, server_q, param):
"""
Generate arring customers
If queues are too big customer will abort
If have enough customers, create a batch and start a server
"""
global num_servers, num_balk, num_cumulative_customer, num_active_server
id = 1
name = 1
while True:
yield env.timeout(param.MEAN_INTERARRIVAL)
# yield env.timeout(np.random.exponential(1/param.MEAN_INTERARRIVAL))
num_cumulative_customer += 1
if name < 10:
customer = Customer(name)
batching_q.append(customer)
name += 1
q_size = len(batching_q) + (param.BATCH_SIZE * len(processing_q.items ))
sys_size = q_size + (num_active_server * param.BATCH_SIZE)
#if q_size > max_q_size:
if sys_size > param.MAX_SYS_SIZE:
num_balk += 1
batching_q.pop(-1)
#print(f'{env.now} customer {customer.name} arrived and aborted, sys len: {sys_size }')
else:
#customer = object()
#batching_q.append(customer)
print(f'{env.now} customer{customer.name} has arrived, q len: {q_size}, sys len: {sys_size}')
customer.arrival_time = env.now
dic_arrival_time.update({customer.name: customer.arrival_time})
# check if a batch can be creatd
while len(batching_q) >= param.BATCH_SIZE:
batch = list()
while len(batch) < param.BATCH_SIZE:
batch.append(batching_q.pop(0))
# put batch in processing q
processing_q.put(batch)
print(f'{env.now} new batch arrived, #idel: {len(processing_q.get_queue)}')
if len(processing_q.get_queue) == 0: #there is no idle server, need to setup a server
if num_servers < param.NUM_OF_SERVER:
server = Server(id, env, processing_q, server_q, param)
id += 1
server_q.append(server)
# print(f'{env.now} setup a new server {server.id}')
# else:
# s_delay = delayoff_q.pop(0)
# env.process(s_delay.delayoff(param)).interrupt()
# env.process(s_delay.process(param))
# the idel server already has a get request on the processing_q store
#Calculate balking probability
prob_balk = num_balk/num_cumulative_customer
#print(f'{env.now} prob_balk {prob_balk}')
list_prob_balk.append(prob_balk)
class Customer:
def __init__(self, name):
self.name = name
self.arrival_time = 0
self.serve_time = 0
self.leave_time = 0
# boot up sim`
for x in [1]:
env = simpy.Environment()
paramtest1 = param(x)
random.seed(paramtest1.RANDOM_SEED)
#Save waiting time
dic_serve_time = {}
dic_arrival_time = {}
list_waiting_time =[]
batching_q = list()
processing_q = simpy.Store(env) # queued servers are idel
server_q = list() # servers that are still starting up
#delayoff_q = list()
num_servers = 0 # number of server in system (both starting and serving server)
num_active_server = 0 # number of servers serving customers
#Save balking customers
num_balk = 0 # number of balking customers
num_cumulative_customer = 0 # total arriving customers
list_prob_balk = [] #list balk prob each trial
num_start =0
#Save average start up server
num_server_state = []
state_time_trans = []
pi_ij = []
#Save average active server E[A]
state_server_active = [] #number server active each time
time_active = [] #the time a server active
delta = [] #the time between each setting up or distance between term in time_active list
# create and start the model
env.process(gen_arrivals(env, batching_q, processing_q, server_q, paramtest1))
env.run(paramtest1.DURATION_TIME)
Upvotes: 0