# -*- coding: utf-8 -*-
"""MRIA Model
Purpose
-------
The Multiregional Impact Assessment (MRIA) Model allows for estimating a new post-disaster economic situation in equilibrium, given a set of disruptions.
References
----------
1) Koks, E. E., & Thissen, M. (2016). A multiregional impact assessment model for disaster analysis. Economic Systems Research, 28(4), 429-449.
"""
import os
import numpy as np
import pandas as pd
from pyomo.environ import (ConcreteModel, Constraint, Objective, Param, Set,
SetOf, Var, minimize)
from pyomo.opt import SolverFactory
from vtra.mria.ratmarg import ratmarg_IO
from vtra.utils import load_config
[docs]class MRIA_IO(object):
"""MRIA_IO sets up the modelling framework.
In this class we define the type of model, sets, set up the core variables and specify the
constraints and objectives for different model setups.
Attributes
----------
name : string
name of the model
m : Pyomo.ConcreteModel
model instance
regions : list
model regions
total_regions : int
number of regions
sectors : list
model sectors
fd_cat : list
final demand categories
"""
def __init__(self, name, list_regions, list_sectors, list_fd_cats=[]):
"""Create a Concrete Model, specify the regions and sectors to include.
Parameters
----------
name : str
name of the model
list_regions : list
regions to include in the model calculations
list_sectors : list
sectors to include in the model calculations
list_fd_cats : list
the final demand categories in the table. This will be aggregated to one column.
"""
self.name = name
self.m = ConcreteModel()
self.regions = list_regions
self.total_regions = len(list_regions)
self.sectors = list_sectors
self.fd_cat = list_fd_cats
[docs] def create_sets(self, FD_SET=None, VA_SET=None):
"""Create of the various sets, allowing for specification of set inputs.
Parameters
----------
FD_SET : list or None, optional
final demand categories, default to an **empty list**
VA_SET : list or None, optional
value added categories, default to an **empty list**
"""
if FD_SET is None:
FD_SET = []
if VA_SET is None:
VA_SET = []
self.m.S = Set(initialize=self.sectors, doc='sectors')
self.m.rROW = Set(initialize=self.regions, ordered=True,
doc='regions including export')
self.m.R = Set(initialize=self.regions, ordered=True, doc='regions')
self.m.fdemand = Set(initialize=self.fd_cat, doc='Final Demand')
self.m.VA = Set(initialize=VA_SET, doc='value added')
[docs] def create_alias(self):
"""Set aliases.
Parameters
----------
list_regions : list[str]
regions to include in the model calculations
list_sectors : list[str]
sectors to include in the model calculations
list_fd_cats : list[str]
the final demand categories in the table. This will be aggregated to one column.
"""
self.m.Rb = SetOf(self.m.R) # an alias of region R
self.m.r = SetOf(self.m.R) # an alias of region R
self.m.Sb = SetOf(self.m.S) # an alias of sector S
[docs] def create_A_mat(self, A_mat_in):
"""Creation of the A-matrix for the optimization model.
Parameters
----------
A_mat_in
A-matrix dictionary from the **io_basic** class object
"""
model = self.m
def A_matrix_init(model, R, S, Rb, Sb):
return A_mat_in[R, S, Rb, Sb]
model.A_matrix = Param(model.R, model.S, model.R, model.Sb,
initialize=A_matrix_init, doc='A matrix')
self.A_matrix = model.A_matrix
[docs] def create_FD(self, FinalD, disr_dict_fd):
"""Specify Final Demand and Local Final Demand.
Parameters
----------
FinalD
Final Demand dictionary from the **io_basic** class object
disr_dict_fd
dictionary containing the disruptions in final demand
Create *self*.ttfd - Pyomo Parameter instance for the total final demand
and *self*.fd - Pyomo Parameter instance for the final demand
"""
disrupted_des = list(np.unique([x[1] for x in disr_dict_fd]))
model = self.m
model.Rdes = Set(initialize=disrupted_des, doc='Final Demand')
def tfd_init(model, R, S, Rb):
if (R, Rb, S) in list(disr_dict_fd.keys()):
return sum(FinalD[R, S, Rb, fdemand] for fdemand in model.fdemand)*disr_dict_fd[R, Rb, S]
else:
return sum(FinalD[R, S, Rb, fdemand] for fdemand in model.fdemand)
def fd_init(model, R, S):
return sum(model.tfd[R, S, Rb] for Rb in model.Rb)
model.tfd = Param(model.R, model.S, model.Rb, initialize=tfd_init, doc='Final Demand')
model.fd = Param(model.R, model.S, initialize=fd_init, doc='Final Demand')
self.ttfd = model.tfd
self.fd = model.fd
[docs] def create_LFD(self, FinalD):
"""Specify local final demand
Parameters
----------
FinalD
Final Demand dictionary from the **io_basic** class object
Create *self*.lfd - Pyomo Parameter instance for the local final demand
"""
model = self.m
def lfd_init(model, R, S):
return sum(FinalD[R, S, R, fdemand] for fdemand in model.fdemand)
model.lfd = Param(model.R, model.S, initialize=lfd_init, doc='Final Demand')
self.lfd = model.lfd
[docs] def create_ExpImp(self, ExpROW, ImpROW):
"""Specify export and import to the rest of the world
Parameters
----------
ExpROW
Exports to the Rest of the World dictionary from the **io_basic** class object
ImpROW
Imports from the Rest of the World dictionary from the **io_basic** class object
Create *self*.ExpROW - Pyomo Parameter instance for the Exports to the Rest of the World
and *self*.ImpROW - Pyomo Parameter instance for the Imports from the Rest of the World
"""
model = self.m
# Specify Export ROW
def ExpROW_ini(m, R, S):
return (ExpROW[R, S, 'Export'])
model.ExpROW = Param(model.R, model.S, initialize=ExpROW_ini,
doc='Exports to the rest of the world')
# Specify Import ROW
def ImpROW_init(m, R, S):
return (ImpROW[R, S, 'Import'])
model.ImpROW = Param(model.R, model.S, initialize=ImpROW_init,
doc='Imports from the rest of the world')
self.ExpROW = model.ExpROW
self.ImpROW = model.ImpROW
[docs] def create_X_up(self, disr_dict, Regmaxcap=0.98):
"""Specify upper bound of total production **X**.
Parameters
----------
disr_dict : dict
dictionary containing the reduction in production capacity
Regmaxcap : float, optional
maximum regional capacity. The default value is set to **0.98**
Create *self*.X_up - Pyomo Parameter instance for the upper bound of total production **X**
"""
model = self.m
def shock_init(model, R, S):
if (R, S) in list(disr_dict.keys()):
return disr_dict[R, S]
else:
return 1.05
model.X_up = Param(model.R, model.S, initialize=shock_init,
doc='Maximum production capacity')
self.X_up = model.X_up
[docs] def create_Xbase(self, Z_matrix, disr_dict, FinalD=None):
"""Specify Baseline value of total production **X**
Parameters
----------
Z_matrix
Z-matrix dictionary from the **io_basic** class object
disr_dict
dictionary containing the reduction in production capacity
FinalD
Final Demand dictionary from the **io_basic** class object
Create *self*.X_up - Pyomo Parameter instance for the Baseline value of total production **X**
"""
model = self.m
if self.fd.active is not True:
self.create_FD(FinalD, disr_dict)
if self.ExpROW.active is not True:
self.create_ExpImp(Z_matrix)
def x_init_base(model, R, S):
return(sum(Z_matrix[R, S, Rb, Sb] for Rb in model.Rb for Sb in
model.Sb) + self.fd[R, S] + self.ExpROW[R, S])
model.Xbase = Param(model.R, model.S, initialize=x_init_base,
doc='Total Production baseline')
self.Xbase = model.Xbase
[docs] def create_X(self, disr_dict, Regmaxcap=0.98, A_matrix_ini=None, Z_matrix=None,
FinalD=None, Xbase=None, fd=None, ExpROW=None):
"""Create the total production **X** variable
Parameters
----------
disr_dict
dictionary containing the reduction in production capacity
Regmaxcap
maximum regional capacity. The default value is set to **0.98**
A_matrix_ini
A-matrix dictionary from the **io_basic** class object
Z_matrix
Z-matrix dictionary from the **io_basic** class object
FinalD
Final Demand dictionary from the **io_basic** class object
Xbase
Total Production **X** parameter from the **MRIA** class object
fd
Final Demand parameter from the **MRIA** class object
ExpROW
Export to the Rest of the World parameter from the **MRIA** class object
Create *self*.X_up - Pyomo Variable instance of total production **X**
"""
model = self.m
if self.Xbase.active is not True:
self.create_Xbase(Z_matrix, FinalD)
if self.A_matrix.active is not True:
self.create_A_mat(A_matrix_ini)
def X_bounds(model, R, S):
if (R, S) in list(disr_dict.keys()):
return (0.0, (1/Regmaxcap*self.Xbase[R, S])*disr_dict[R, S])
else:
return (0.0, (1/Regmaxcap*self.Xbase[R, S])*1.1)
def x_init(model, R, S):
return(sum(self.A_matrix[R, S, Rb, Sb]*self.Xbase[Rb, Sb] for Rb in model.Rb
for Sb in model.Sb) + self.fd[R, S] + self.ExpROW[R, S])
model.X = Var(model.R, model.S, bounds=X_bounds,
initialize=x_init, doc='Total Production')
self.X = model.X
[docs] def create_VA(self, ValueA):
"""Specify Value Added
Parameters
----------
ValueA
Value Added dictionary from the **io_basic** class object
Create *self*.ValueA - Pyomo Parameter instance for the total Value Added
"""
model = self.m
def va_init(model, R, S):
return ValueA[R, S, 'VA']
model.ValueA = Param(model.R, model.S, initialize=va_init, doc='Value Added')
self.ValueA = model.ValueA
[docs] def create_Z_mat(self):
"""Specify Trade between regions
Create *self*.Z_matrix - Pyomo Parameter instance for the total trade matrix
"""
model = self.m
def Z_matrix_init(model, R, S, Rb, Sb):
return self.A_matrix[R, S, Rb, Sb]*self.X[Rb, Sb]
model.Z_matrix = Param(model.R, model.S, model.R, model.Sb,
initialize=Z_matrix_init, doc='Z matrix')
self.Z_matrix = model.Z_matrix
[docs] def create_Trade(self, FinalD, Z_matrix=None):
"""Create Trade Matrix
Parameters
----------
FinalD
Final Demand dictionary from the **io_basic** class object
Z_matrix
Z-matrix dictionary from the **io_basic** class object
Create *self*.trade - Pyomo Parameter instance for the trade matrix between regions
"""
model = self.m
def Trade_init(model, R, Rb, S):
while R != Rb:
return sum(self.Z_matrix[Rb, S, R, i] for i in model.Sb) + sum(FinalD[Rb, S, R, i] for i in model.fdemand)
model.trade = Param(model.R, model.Rb, model.S, initialize=Trade_init, doc='Trade')
self.trade = model.trade
[docs] def create_TotExp(self):
"""Estimate Total Export
Create *self*.TotExp - Pyomo Parameter instance for the total export
"""
model = self.m
def totexp_init(model, R, S):
return sum(self.trade[Rb, R, S] for Rb in model.Rb if (R != Rb))
model.TotExp = Param(model.R, model.S, initialize=totexp_init,
doc='Total exports between regions')
self.TotExp = model.TotExp
[docs] def create_TotImp(self):
"""Estimate Total Import
Create *self*.TotExp - Pyomo Parameter instance for the total import
"""
model = self.m
def totimp_init(model, R, S):
return sum(self.trade[R, Rb, S] for Rb in model.Rb if (R != Rb))
model.TotImp = Param(model.R, model.S, initialize=totimp_init,
doc='Total imports between regions')
self.TotImp = model.TotImp
[docs] def create_ImpShares(self):
"""Estimate Import shares and Import share DisImp
Create *self*.ImportShare - Pyomo Parameter instance for the total import shares
"""
model = self.m
def impsh_init(model, R, Rb, S):
# & ((sum(self.A_matrix[R, S, Rb, Sb]*self.X[Rb, Sb] for Sb in model.Sb) + self.fd[Rb, S]) != None):
while (self.trade[Rb, R, S] != None):
try:
return self.trade[Rb, R, S]/(sum(self.A_matrix[R, S, Rb, Sb]*self.X[Rb, Sb] for Sb in model.Sb) + self.fd[Rb, S])
except ZeroDivisionError:
return 0
def impshdis_init(model, R, Rb, S):
return (sum(self.A_matrix[R, S, Rb, Sb]*self.X[Rb, Sb] for Sb in model.Sb) + self.fd[Rb, S])
model.ImportShare = Param(model.R, model.Rb, model.S,
initialize=impsh_init, doc='Importshare of each region')
model.ImportShareDisImp = Param(
model.R, model.Rb, model.S, initialize=impshdis_init, doc='Importshare DisImp of each region')
self.ImportShare = model.ImportShare
self.ImportShareDisImp = model.ImportShareDisImp
[docs] def create_Rdem(self):
"""Create reconstruction demand variable.
Create *self*.Rdem - Pyomo Parameter instance for the total reconstruction demand
"""
model = self.m
model.Rdem = Param(model.R, model.S, initialize=0, doc='Reconstruction demand')
self.Rdem = model.Rdem
[docs] def create_Rat(self, FinalD=None, Z_matrix=None):
"""Create rationing variable
Parameters
----------
FinalD
Final Demand dictionary from the **io_basic** class object
Z_matrix
Z-matrix dictionary from the **io_basic** class object
Set *self*.Rat - Pyomo Variable instance for rationing
"""
model = self.m
if self.lfd.active is not True:
self.create_LFD(FinalD)
if self.ExpROW.active is not True:
self.create_ExpImp(Z_matrix)
def Rat_bounds(model, R, S):
return (0, abs(self.lfd[R, S]+self.ExpROW[R, S]))
model.Rat = Var(model.R, model.S, bounds=Rat_bounds, initialize=0, doc='Rationing')
self.Rat = model.Rat
[docs] def create_Ratmarg(self, Table):
"""Estimate the marginal values of the rationing variable
Parameters
----------
Table
the **io_basic** class object
Set *self*.Ratmarg - Pyomo Parameter instance for the marginal value of rationing
"""
model = self.m
try:
data_path = load_config()['paths']['data']
RatMarg = pd.read_csv(os.path.join(data_path, 'input_data',
'Ratmarg_{}.csv'.format(self.name)), index_col=[0], header=0)
if (set(list(RatMarg.index.values)) != set(list(self.regions))):
RatMarg = ratmarg_IO(Table)
except:
RatMarg = ratmarg_IO(Table)
Ratmarginal = {(r, k): v for r, kv in RatMarg.iterrows()
for k, v in kv.to_dict().items()}
model.Ratmarg = Param(model.R, model.S, initialize=Ratmarginal,
doc='Rationing marginal', mutable=True)
self.Ratmarg = model.Ratmarg
[docs] def create_DisImp(self, disr_dict, Regmaxcap=0.98):
"""Create disaster import variable.
Parameters
----------
disr_dict
dictionary containing the reduction in production capacity
Regmaxcap
maximum regional capacity. The default value is set to **0.98**
Set *self*.DisImp - Pyomo Variable instance for disaster imports
"""
model = self.m
disrupted_ctry = list(np.unique([x[0] for x in disr_dict]))
# problem regions
dimp_ctry = ['KEN', 'UGA']
dimp_ind = ['i3']
def Dis_bounds(model, R, S):
if R in dimp_ctry and S in dimp_ind:
return (0, 0)
elif (model.X_up[R, S] < (1.05) or R in disrupted_ctry):
return (0, None)
else:
return (0, None)
model.DisImp = Var(model.R, model.S, bounds=Dis_bounds,
initialize=0, doc='Disaster Imports')
self.DisImp = model.DisImp
[docs] def create_demand(self):
"""Specify demand function
Create *self*.Demand - Pyomo Variable instance for total demand
"""
model = self.m
def demand_init(model, R, S):
return (
sum(self.A_matrix[R, S, R, Sb]*self.X[R, Sb]
for Sb in model.Sb) + self.lfd[R, S] + self.Rdem[R, S] - self.Rat[R, S]
+ sum(self.ImportShare[R, Rb, S]*(sum(self.A_matrix[R, S, Rb, Sb]*self.X[Rb, Sb]
for Sb in model.Sb) + self.fd[Rb, S] + self.Rdem[Rb, S] - self.Rat[R, S]) for Rb in model.Rb if (R != Rb))
+ sum(self.ImportShare[R, Rb, S]*(self.DisImp[Rb, S])
for Rb in model.Rb if (R != Rb))
+ self.ExpROW[R, S]
)
model.Demand = Var(model.R, model.S, bounds=(0.0, None), initialize=demand_init)
self.Demand = model.Demand
#
# Create baseline dataset to use in model
#
[docs] def baseline_data(self, Table, disr_dict_sup, disr_dict_fd):
"""Set up all the baseline variables for the MRIA model
Parameters
----------
Table
the **io_basic** class object
disr_dict_sup
dictionary containing the reduction in production capacity
disr_dict_fd
dictionary containing the disruptions in final demand
Set all required parameters and variables for the **MRIA_IO** class and the **MRIA**
model.
"""
self.create_ExpImp(Table.ExpROW, Table.ImpROW)
self.create_A_mat(Table.A_matrix)
self.create_FD(Table.FinalD, disr_dict_fd)
self.create_LFD(Table.FinalD)
self.create_Xbase(Table.Z_matrix, Table.FinalD, disr_dict_fd)
self.create_X(disr_dict_sup, Z_matrix=Table.Z_matrix, FinalD=Table.FinalD)
self.create_VA(Table.ValueA)
self.create_Z_mat()
self.create_Trade(Table.FinalD)
self.create_TotExp()
self.create_TotImp()
self.create_ImpShares()
[docs] def impact_data(self, Table, disr_dict_sup, disr_dict_fd, Regmaxcap=0.98):
"""Create additional parameters and variables required for impact analysis
Parameters
----------
Table
the **io_basic** class object
disr_dict_sup
dictionary containing the reduction in production capacity
disr_dict_fd
dictionary containing the disruptions in final demand
Regmaxcap
maximum regional capacity. The default value is set to **0.98**
Set all additional parameters and variables required for the **MRIA_IO** class and the
**MRIA** model to do an impact analysis.
"""
self.create_X_up(disr_dict_sup)
self.create_Rdem()
self.create_Rat(Table.FinalD, Table.Z_matrix)
self.create_Ratmarg(Table)
self.create_DisImp(disr_dict_sup)
self.create_demand()
#
# Set up baseline model
#
[docs] def run_basemodel(self, solver=None):
"""Run the baseline model of the MRIA model.
This should return the baseline situation (i.e. no changes between input matrix and
output matrix).
Parameters
----------
solver : str
Specify the solver to be used with Pyomo. The Default value is set to
**None**. If set to **None**, the ipopt solver will be used
Write out the output of an optimized **MRIA_IO** class and the **MRIA** model
"""
model = self.m
if solver is None:
solver = 'ipopt'
def demSup(model, R, S):
return (self.X[R, S] >=
sum(self.A_matrix[R, S, R, Sb]*self.X[R, Sb]
for Sb in model.Sb) + self.lfd[R, S]
+ sum(self.ImportShare[R, Rb, S]*(sum(self.A_matrix[R, S, Rb, Sb]*self.X[Rb, Sb]
for Sb in model.Sb) + self.fd[Rb, S]) for Rb in model.Rb if (R != Rb))
+ self.ExpROW[R, S])
model.demSup = Constraint(model.R, model.S, rule=demSup, doc='Satisfy demand')
def objective_base(model):
return sum(self.X[R, S] for R in model.R for S in model.S)
model.objective = Objective(rule=objective_base, sense=minimize,
doc='Define objective function')
opt = SolverFactory(solver)
if solver is 'ipopt':
opt.options['warm_start_init_point'] = 'yes'
opt.options['warm_start_bound_push'] = 1e-6
opt.options['warm_start_mult_bound_push'] = 1e-6
opt.options['mu_init'] = 1e-6
results = opt.solve(model, tee=True)
# sends results to stdout
results.write()
[docs] def run_impactmodel(self, solver=None, output=None, tol=1e-6, DisWeight=1.75, RatWeight=2):
"""Run the **MRIA** model with disruptions. This will return an economy with a new
equilibrium, based on the new production and demand values.
Parameters
----------
solver
Specify the solver to be used with Pyomo. The Default value is set to **None**. If set to **None**, the ipopt solver will be used
output
Specify whether you want the solver to print its progress.The default value is set to **None**
tol
the tolerance value that determines whether the outcome of the model is feasible. The default value is set to **1e-6**
DisWeight
the weight that determines the penalty set to let the model allow for additional imports. A higher penalty value will result in less imports. The default value is set to **1.75**
RatWeight
the weight that determines the penalty set to let the model allow to ration goods. A higher penalty value will result in less rationing. The default value is set to **2**
Optionally write out the output of an optimized **MRIA_IO** class and **MRIA** model
"""
model = self.m
if solver is None:
solver = 'ipopt'
if DisWeight is None:
DisWeight = 1.75
if RatWeight is None:
RatWeight = 2
def demDisRat(model, R, S):
return (
self.Demand[R, S] == (sum(self.A_matrix[R, S, R, Sb]*self.X[R, Sb] for Sb in model.Sb) + self.lfd[R, S] + self.Rdem[R, S] - self.Rat[R, S]
+ sum(self.ImportShare[R, Rb, S]*(sum(self.A_matrix[R, S, Rb, Sb]*self.X[Rb, Sb] for Sb in model.Sb) +
self.fd[Rb, S] + self.Rdem[Rb, S] - self.Rat[Rb, S]) for Rb in model.Rb if (R != Rb))
+ sum(self.ImportShare[R, Rb, S]*(self.DisImp[Rb, S])
for Rb in model.Rb if (R != Rb))
+ self.ExpROW[R, S])
)
model.demDisRat = Constraint(model.R, model.S, rule=demDisRat, doc='Satisfy demand')
def demsupDis(model, R, S):
return (self.DisImp[R, S]+self.X[R, S]) >= self.Demand[R, S]
model.demsupDis = Constraint(model.R, model.S, rule=demsupDis, doc='Satisfy demand')
def DisImpA(model, R, S):
return (self.DisImp[R, S]*(self.DisImp[R, S] + (self.Xbase[R, S]*self.X_up[R, S]) - self.Demand[R, S])) == 0
model.DisImpA = Constraint(model.R, model.S, rule=DisImpA, doc='Satisfy demand')
def DisImpEq(model, R, S):
# return m.DisImp[R, S] >= (m.Demand[R, S] - (m.X[R, S]))
return self.DisImp[R, S] >= (self.Demand[R, S] - (self.X[R, S]*self.X_up[R, S]))
# model.DisImpEq = Constraint(model.R, model.S, rule=DisImpEq, doc='Satisfy demand')
def ObjectiveDis2(model):
return (
sum(self.X[R, S] for S in model.S for R in model.R)
+ DisWeight*sum((self.Ratmarg[R, S]*self.DisImp[R, S])
for R in model.R for S in model.S)
+ RatWeight*sum((self.Ratmarg[R, S]*self.Rat[R, S])
for R in model.R for S in model.S)
+ sum((sum(self.ImportShare[R, Rb, S]*(sum(self.A_matrix[R, S, Rb, Sb]*self.X[Rb, Sb] for Sb in model.Sb) + self.fd[Rb, S] + self.Rdem[Rb, S] - self.Rat[Rb, S]) for Rb in model.Rb if (R != Rb))
+ sum(self.ImportShare[R, Rb, S]*(self.DisImp[Rb, S]) for Rb in model.Rb if (R != Rb))) for R in model.R for S in model.S)
)
model.objective = Objective(rule=ObjectiveDis2, sense=minimize,
doc='Define objective function')
opt = SolverFactory(solver)
if solver is 'ipopt':
opt.options['max_iter'] = 5000
opt.options['warm_start_init_point'] = 'yes'
opt.options['warm_start_bound_push'] = 1e-6
opt.options['warm_start_mult_bound_push'] = 1e-6
opt.options['mu_init'] = 1e-6
if tol != 1e-6:
opt.options['tol'] = tol
if output is None:
opt.solve(model, tee=False)
else:
results = opt.solve(model, tee=True)
# sends results to stdout
results.write()