Files
ortools-clone/examples/python/no_wait_baking_scheduling_sat.py
2024-07-22 11:31:53 +02:00

310 lines
10 KiB
Python

#!/usr/bin/env python3
# Copyright 2010-2024 Google LLC
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Scheduling cooking tasks in a bakery using no-wait jobshop scheduling.
We are scheduling a full day of baking:
- Work starts at 4am.
- The shop closes at 10PM.
- Durations are in minutes. Time starts at midnight.
"""
import collections
from typing import Sequence
from absl import app
from absl import flags
from google.protobuf import text_format
from ortools.sat.python import cp_model
_PARAMS = flags.DEFINE_string(
"params", "num_search_workers:16, max_time_in_seconds:30", "Sat solver parameters."
)
_PROTO_FILE = flags.DEFINE_string(
"proto_file", "", "If not empty, output the proto to this file."
)
# Recipes
CROISSANT = "croissant"
APPLE_PIE = "apple pie"
BRIOCHE = "brioche"
CHOCOLATE_CAKE = "chocolate cake"
# Skills
BAKING = "baking"
PROOFING = "proofing"
COOKING = "cooking"
COOLING = "cooling"
DECORATING = "decorating"
DISPLAY = "display"
class Task:
"""A unit baking task.
- Simple baking tasks have a fixed duration. They are performed by workers.
- Waiting/cooling/proofing tasks have a min and a max duration.
They are performed by machine or they use space resources.
"""
def __init__(self, name, min_duration, max_duration):
self.name = name
self.min_duration = min_duration
self.max_duration = max_duration
class Skill:
"""The skill of a worker or the capability of a machine."""
def __init__(self, name, efficiency):
self.name = name
# Efficiency is currently not used.
self.efficiency = efficiency
class Recipe:
"""A recipe is a sequence of cooking tasks."""
def __init__(self, name):
self.name = name
self.tasks = []
def add_task(self, resource_name, min_duration, max_duration):
self.tasks.append(Task(resource_name, min_duration, max_duration))
return self
class Resource:
"""A resource is a worker, a machine, or just some space for cakes to rest.
- Workers have a capacity of 1 and can have variable efficiency.
- Machines and spaces have a capacity greater or equal to one, but the
efficiency is fixed to 100.
For a worker with efficiency k and a task of duration t, the resulting
work will have a duration `ceil(t * k)`.
"""
def __init__(self, name, capacity):
self.name = name
self.capacity = capacity
self.skills = []
def add_skill(self, skill_name, efficiency):
self.skills.append(Skill(skill_name, efficiency))
return self
class Order:
"""An order is a recipe that should be delivered at a given due date."""
def __init__(self, unique_id, recipe_name, due_date, quantity):
"""Builds an order.
Args:
unique_id: A unique identifier for the order. Used to display the result.
recipe_name: The name of the recipe. It must match one of the recipes.
due_date: The due date in minutes since midnight.
quantity: How many cakes to prepare.
"""
self.unique_id = unique_id
self.recipe_name = recipe_name
self.due_date = due_date
self.quantity = quantity
def set_up_data():
"""Set up the bakery problem data."""
# Recipes.
croissant_recipe = Recipe(CROISSANT)
croissant_recipe.add_task(BAKING, 15, 15)
croissant_recipe.add_task(PROOFING, 60, 90)
croissant_recipe.add_task(COOKING, 20, 20)
croissant_recipe.add_task(DISPLAY, 5, 5 * 60)
apple_pie_recipe = Recipe(APPLE_PIE)
apple_pie_recipe.add_task(BAKING, 25, 25)
apple_pie_recipe.add_task(PROOFING, 15, 60)
apple_pie_recipe.add_task(COOKING, 30, 30)
apple_pie_recipe.add_task(DECORATING, 5, 5)
apple_pie_recipe.add_task(DISPLAY, 5, 5 * 60)
brioche_recipe = Recipe(BRIOCHE)
brioche_recipe.add_task(BAKING, 20, 20)
brioche_recipe.add_task(PROOFING, 60, 90)
brioche_recipe.add_task(COOKING, 30, 30)
brioche_recipe.add_task(DISPLAY, 5, 5 * 60)
chocolate_cake_recipe = Recipe(CHOCOLATE_CAKE)
chocolate_cake_recipe.add_task(BAKING, 15, 15)
chocolate_cake_recipe.add_task(COOKING, 25, 25)
chocolate_cake_recipe.add_task(DECORATING, 15, 15)
chocolate_cake_recipe.add_task(DISPLAY, 5, 5 * 60)
recipes = [
croissant_recipe,
apple_pie_recipe,
brioche_recipe,
chocolate_cake_recipe,
]
# Resources.
baker1 = Resource("baker1", 1).add_skill(BAKING, 1.0)
baker2 = Resource("baker2", 1).add_skill(BAKING, 1.0)
decorator1 = Resource("decorator1", 1).add_skill(DECORATING, 1.0)
waiting_space = Resource("waiting_space", 4).add_skill(PROOFING, 1.0)
oven = Resource("oven", 4).add_skill(COOKING, 1.0)
display_space = Resource("display_space", 12).add_skill(DISPLAY, 1.0)
resources = [baker1, baker2, decorator1, waiting_space, oven, display_space]
# Orders
croissant_7am = Order("croissant_7am", CROISSANT, 7 * 60, 3)
croissant_8am = Order("croissant_8am", CROISSANT, 8 * 60, 3)
croissant_9am = Order("croissant_9am", CROISSANT, 9 * 60, 2)
croissant_10am = Order("croissant_10am", CROISSANT, 10 * 60, 1)
croissant_11am = Order("croissant_11am", CROISSANT, 11 * 60, 1)
brioche_10am = Order("brioche_10am", BRIOCHE, 10 * 60, 8)
brioche_12pm = Order("brioche_12pm", BRIOCHE, 12 * 60, 8)
apple_pie_1pm = Order("apple_pie_1pm", APPLE_PIE, 13 * 60, 10)
chocolate_4pm = Order("chocolate_4pm", CHOCOLATE_CAKE, 16 * 60, 10)
orders = [
croissant_7am,
croissant_8am,
croissant_9am,
croissant_10am,
croissant_11am,
brioche_10am,
brioche_12pm,
apple_pie_1pm,
chocolate_4pm,
]
return recipes, resources, orders
def solve_with_cp_sat(recipes, resources, orders):
"""Build the optimization model, and solve the problem."""
model = cp_model.CpModel()
horizon = 22 * 60 # 10PM.
start_work = 4 * 60 # 4am.
# Parse recipes.
recipe_by_name = {}
for recipe in recipes:
recipe_by_name[recipe.name] = recipe
# Parse resources.
resource_by_name = {}
resource_list_by_skill_name = collections.defaultdict(list)
for resource in resources:
resource_by_name[resource.name] = resource
for skill in resource.skills:
resource_list_by_skill_name[skill.name].append(resource)
# Parse orders and create one optional copy per eligible resource and per
# task.
interval_list_by_resource_name = collections.defaultdict(list)
orders_sequence_of_events = collections.defaultdict(list)
sorted_orders = []
tardiness_vars = []
for order in orders:
for batch in range(order.quantity):
order_id = f"{order.unique_id}_{batch}"
sorted_orders.append(order_id)
previous_end = None
due_date = order.due_date
recipe = recipe_by_name[order.recipe_name]
for task in recipe.tasks:
skill_name = task.name
suffix = f"_{order.unique_id}_batch{batch}_{skill_name}"
start = None
if previous_end is None:
start = model.new_int_var(start_work, horizon, f"start{suffix}")
orders_sequence_of_events[order_id].append(
(start, f"start{suffix}")
)
else:
start = previous_end
size = model.new_int_var(
task.min_duration, task.max_duration, f"size{suffix}"
)
end = None
if task == recipe.tasks[-1]:
# The order must end after the due_date. Ideally, exactly at the
# due_date.
tardiness = model.new_int_var(0, horizon - due_date, f"end{suffix}")
end = tardiness + due_date
# Store the end_var for the objective.
tardiness_vars.append(tardiness)
else:
end = model.new_int_var(start_work, horizon, f"end{suffix}")
orders_sequence_of_events[order_id].append((end, f"end{suffix}"))
previous_end = end
# Per resource copy.
presence_literals = []
for resource in resource_list_by_skill_name[skill_name]:
presence = model.new_bool_var(f"presence{suffix}_{resource.name}")
copy = model.new_optional_interval_var(
start, size, end, presence, f"interval{suffix}_{resource.name}"
)
interval_list_by_resource_name[resource.name].append(copy)
presence_literals.append(presence)
# Only one copy will be performed.
model.add_exactly_one(presence_literals)
# Create resource constraints.
for resource in resources:
intervals = interval_list_by_resource_name[resource.name]
if resource.capacity == 1:
model.add_no_overlap(intervals)
else:
model.add_cumulative(intervals, [1] * len(intervals), resource.capacity)
# The objective is to minimize the sum of the tardiness values of each jobs.
# The tardiness is difference between the end time of an order and its
# due date.
model.minimize(sum(tardiness_vars))
# Solve model.
solver = cp_model.CpSolver()
if _PARAMS.value:
text_format.Parse(_PARAMS.value, solver.parameters)
solver.parameters.log_search_progress = True
status = solver.solve(model)
if status == cp_model.OPTIMAL or status == cp_model.FEASIBLE:
for order_id in sorted_orders:
print(f"{order_id}:")
for time_expr, event_id in orders_sequence_of_events[order_id]:
time = solver.value(time_expr)
print(f" {event_id} at {time // 60}:{time % 60:02}")
def main(argv: Sequence[str]) -> None:
if len(argv) > 1:
raise app.UsageError("Too many command-line arguments.")
recipes, resources, orders = set_up_data()
solve_with_cp_sat(recipes, resources, orders)
if __name__ == "__main__":
app.run(main)