# -*- coding: utf-8 -*-
###############################################################################
# Copyright (c), Forschungszentrum Jülich GmbH, IAS-1/PGI-1, Germany. #
# All rights reserved. #
# This file is part of the AiiDA-FLEUR package. #
# #
# The code is hosted on GitHub at https://github.com/JuDFTteam/aiida-fleur #
# For further information on the license, see the LICENSE.txt file #
# For further information please visit http://www.flapw.de or #
# http://aiida-fleur.readthedocs.io/en/develop/ #
###############################################################################
"""
In this module you find the workflow 'FleurRelaxWorkChain' for geometry optimization.
"""
from __future__ import absolute_import
from __future__ import print_function
import copy
import numpy as np
import six
from aiida.engine import WorkChain, ToContext, while_, if_
from aiida.engine import calcfunction as cf
from aiida.orm import load_node
from aiida.orm import StructureData, Dict
from aiida.common import AttributeDict
from aiida.common.exceptions import NotExistent
from aiida_fleur.workflows.scf import FleurScfWorkChain
from aiida_fleur.calculation.fleur import FleurCalculation as FleurCalc
from aiida_fleur.common.constants import bohr_a
from aiida_fleur.tools.StructureData_util import break_symmetry_wf
[docs]class FleurRelaxWorkChain(WorkChain):
"""
This workflow performs structure optimization.
"""
_workflowversion = '0.2.0'
_wf_default = {
'relax_iter': 5,
'film_distance_relaxation': False,
'force_criterion': 0.001,
'run_final_scf': False,
'break_symmetry': False,
'change_mixing_criterion': 0.025,
'atoms_off': [] # '49' is reserved
}
@classmethod
def define(cls, spec):
super(FleurRelaxWorkChain, cls).define(spec)
spec.expose_inputs(FleurScfWorkChain, namespace='scf')
spec.input('wf_parameters', valid_type=Dict, required=False)
spec.outline(
cls.start,
cls.converge_scf,
cls.check_failure,
while_(cls.condition)(
cls.generate_new_fleurinp,
cls.converge_scf,
cls.check_failure,
),
cls.get_results_relax,
if_(cls.should_run_final_scf)(cls.run_final_scf, cls.get_results_final_scf),
cls.return_results,
)
spec.output('out', valid_type=Dict)
spec.output('optimized_structure', valid_type=StructureData)
# exit codes
spec.exit_code(230, 'ERROR_INVALID_INPUT_PARAM', message='Invalid workchain parameters.')
spec.exit_code(350, 'ERROR_DID_NOT_RELAX', message='Optimization cycle did not lead to convergence of forces.')
spec.exit_code(351, 'ERROR_SCF_FAILED', message='SCF Workchains failed for some reason.')
spec.exit_code(352, 'ERROR_NO_RELAX_OUTPUT', message='Found no relaxed structure info in the output of SCF')
spec.exit_code(353, 'ERROR_NO_SCF_OUTPUT', message='Found no SCF output')
spec.exit_code(354, 'ERROR_SWITCH_BFGS', message='Force is small, switch to BFGS')
spec.exit_code(311,
'ERROR_VACUUM_SPILL_RELAX',
message='FLEUR calculation failed because an atom spilled to the'
'vacuum during relaxation')
spec.exit_code(313, 'ERROR_MT_RADII_RELAX', message='Overlapping MT-spheres during relaxation.')
[docs] def start(self):
"""
Retrieve and initialize paramters of the WorkChain
"""
self.report('INFO: started structure relaxation workflow version {}\n' ''.format(self._workflowversion))
self.ctx.info = []
self.ctx.warnings = []
self.ctx.errors = []
# Pre-initialization of some variables
self.ctx.loop_count = 0
self.ctx.forces = []
self.ctx.final_cell = None
self.ctx.final_atom_positions = None
self.ctx.pbc = None
self.ctx.reached_relax = True
self.ctx.switch_bfgs = False
self.ctx.scf_res = None
self.ctx.final_structure = None
# initialize the dictionary using defaults if no wf paramters are given
wf_default = copy.deepcopy(self._wf_default)
if 'wf_parameters' in self.inputs:
wf_dict = self.inputs.wf_parameters.get_dict()
else:
wf_dict = wf_default
extra_keys = []
for key in wf_dict.keys():
if key not in wf_default.keys():
extra_keys.append(key)
if extra_keys:
error = 'ERROR: input wf_parameters for Relax contains extra keys: {}'.format(extra_keys)
self.report(error)
return self.exit_codes.ERROR_INVALID_INPUT_PARAM
# extend wf parameters given by user using defaults
for key, val in six.iteritems(wf_default):
wf_dict[key] = wf_dict.get(key, val)
self.ctx.wf_dict = wf_dict
if '49' in wf_dict['atoms_off']:
error = '"49" label for atoms_off is reserved for internal use'
self.report(error)
return self.exit_codes.ERROR_INVALID_INPUT_PARAM
[docs] def converge_scf(self):
"""
Submits :class:`aiida_fleur.workflows.scf.FleurScfWorkChain`.
"""
inputs = {}
if self.ctx.loop_count:
inputs = self.get_inputs_scf()
else:
inputs = self.get_inputs_first_scf()
res = self.submit(FleurScfWorkChain, **inputs)
return ToContext(scf_res=res)
[docs] def check_failure(self):
"""
Throws an exit code if scf failed
"""
try:
scf_wc = self.ctx.scf_res
except AttributeError:
message = 'ERROR: Something went wrong I do not have new atom positions calculation'
self.control_end_wc(message)
return self.exit_codes.ERROR_NO_SCF_OUTPUT
if not scf_wc.is_finished_ok:
exit_statuses = FleurScfWorkChain.get_exit_statuses(['ERROR_FLEUR_CALCULATION_FAILED'])
if scf_wc.exit_status == exit_statuses[0]:
fleur_calc = load_node(scf_wc.outputs.output_scf_wc_para.get_dict()['last_calc_uuid'])
if fleur_calc.exit_status == FleurCalc.get_exit_statuses(['ERROR_VACUUM_SPILL_RELAX'])[0]:
self.control_end_wc('Failed due to atom and vacuum overlap')
return self.exit_codes.ERROR_VACUUM_SPILL_RELAX
elif fleur_calc.exit_status == FleurCalc.get_exit_statuses(['ERROR_MT_RADII_RELAX'])[0]:
self.control_end_wc('Failed due to MT overlap')
return self.exit_codes.ERROR_MT_RADII_RELAX
return self.exit_codes.ERROR_SCF_FAILED
[docs] def condition(self):
"""
Checks if relaxation criteria is achieved.
:return: True if structure is optimised and False otherwise
"""
scf_wc = self.ctx.scf_res
try:
last_calc = load_node(scf_wc.outputs.output_scf_wc_para.dict.last_calc_uuid)
except (NotExistent, AttributeError):
# TODO: throw exit code
# message = 'ERROR: Did not manage to read the largest force'
# self.control_end_wc(message)
# return self.exit_codes.ERROR_RELAX_FAILED
return False
else:
forces_data = last_calc.outputs.relax_parameters.get_dict()['posforces'][-1]
all_forces = []
for force in forces_data:
all_forces.extend(force[-3:])
all_forces = [abs(x) for x in all_forces]
self.ctx.forces.append(max(all_forces))
largest_now = self.ctx.forces[-1]
if largest_now < self.ctx.wf_dict['force_criterion']:
self.report('Structure is converged to the largest force ' '{}'.format(self.ctx.forces[-1]))
return False
elif largest_now < self.ctx.wf_dict['change_mixing_criterion'] and self.inputs.scf.wf_parameters['force_dict'][
'forcemix'] == 'straight':
self.report('Seems it is safe to switch to BFGS. Current largest force: ' '{}'.format(self.ctx.forces[-1]))
self.ctx.switch_bfgs = True
return False
self.ctx.loop_count = self.ctx.loop_count + 1
if self.ctx.loop_count == self.ctx.wf_dict['relax_iter']:
self.ctx.reached_relax = False
self.report('INFO: Reached optimization iteration number {}. Largest force is {}, '
'force criterion is {}'.format(self.ctx.loop_count + 1, largest_now,
self.ctx.wf_dict['force_criterion']))
return False
self.report('INFO: submit optimization iteration number {}. Largest force is {}, '
'force criterion is {}'.format(self.ctx.loop_count + 1, largest_now,
self.ctx.wf_dict['force_criterion']))
return True
[docs] def generate_new_fleurinp(self):
"""
This function fetches relax.xml from the previous iteration and calls
:meth:`~aiida_fleur.workflows.relax.FleurRelaxWorkChain.analyse_relax()`.
New FleurinpData is stored in the context.
"""
# TODO do we loose provenance here, which we like to keep?
scf_wc = self.ctx.scf_res
last_calc = load_node(scf_wc.outputs.output_scf_wc_para.get_dict()['last_calc_uuid'])
try:
relax_parsed = last_calc.outputs.relax_parameters
except NotExistent:
return self.exit_codes.ERROR_NO_SCF_OUTPUT
new_fleurinp = self.analyse_relax(relax_parsed)
self.ctx.new_fleurinp = new_fleurinp
[docs] @staticmethod
def analyse_relax(relax_dict):
"""
This function generates a new fleurinp analysing parsed relax.xml from the previous
calculation.
**NOT IMPLEMENTED YET**
:param relax_dict: parsed relax.xml from the previous calculation
:return new_fleurinp: new FleurinpData object that will be used for next relax iteration
"""
# TODO: implement this function, now always use relax.xml generated in FLEUR
should_relax = False
if should_relax:
return 1
return None
[docs] def should_run_final_scf(self):
"""
Check if a final scf should be run on the optimized structure
"""
return self.ctx.wf_dict.get('run_final_scf', False)
[docs] def run_final_scf(self):
"""
Run a final scf for charge convergence on the optimized structure
"""
inputs = {}
inputs = self.get_inputs_final_scf()
res = self.submit(FleurScfWorkChain, **inputs)
return ToContext(scf_final_res=res)
[docs] def get_results_relax(self):
"""
Generates results of the workchain.
Creates a new structure data node which is an
optimized structure.
"""
try:
relax_out = self.ctx.scf_res.outputs.last_fleur_calc_output
except NotExistent:
return self.exit_codes.ERROR_NO_SCF_OUTPUT
relax_out = relax_out.get_dict()
try:
cell = relax_out['relax_brav_vectors']
atom_positions = relax_out['relax_atom_positions']
film = relax_out['film']
total_energy = relax_out['energy']
total_energy_units = relax_out['energy_units']
except KeyError:
return self.exit_codes.ERROR_NO_RELAX_OUTPUT
self.ctx.total_energy_last = total_energy
self.ctx.total_energy_units = total_energy_units
self.ctx.final_cell = cell
self.ctx.final_atom_positions = atom_positions
if film == 'True':
self.ctx.pbc = (True, True, False)
else:
self.ctx.pbc = (True, True, True)
# we build the structure here, that way we can run an scf afterwards
if self.ctx.final_cell:
np_cell = np.array(self.ctx.final_cell) * bohr_a
structure = StructureData(cell=np_cell.tolist())
for atom in self.ctx.final_atom_positions:
np_pos = np.array(atom[1:])
pos_abs = np_pos @ np_cell
if self.ctx.pbc == (True, True, True):
structure.append_atom(position=(pos_abs[0], pos_abs[1], pos_abs[2]), symbols=atom[0])
else: # assume z-direction is orthogonal to xy
structure.append_atom(position=(pos_abs[0], pos_abs[1], atom[3] * bohr_a), symbols=atom[0])
structure.pbc = self.ctx.pbc
self.ctx.final_structure = structure
[docs] def get_results_final_scf(self):
"""
Parser some results of final scf
"""
try:
scf_out = self.ctx.scf_final_res.outputs.last_fleur_calc_output
except NotExistent:
return self.exit_codes.ERROR_NO_SCF_OUTPUT
scf_out_d = scf_out.get_dict()
try:
total_energy = scf_out_d['energy']
total_energy_units = scf_out_d['energy_units']
except KeyError:
self.report('ERROR: Could not parse total energy of final scf run')
#return self.exit_codes.ERROR_NO_RELAX_OUTPUT
self.ctx.total_energy_last = total_energy
self.ctx.total_energy_units = total_energy_units
[docs] def return_results(self):
"""
This function stores results of the workchain into the output nodes.
"""
#TODO maybe we want to have a more detailed array output node with the force and
# position history of all atoms?
out = {
'workflow_name': self.__class__.__name__,
'workflow_version': self._workflowversion,
'energy': self.ctx.total_energy_last,
'energy_units': self.ctx.total_energy_units,
'info': self.ctx.info,
'warnings': self.ctx.warnings,
'errors': self.ctx.errors,
'force': self.ctx.forces,
'force_iter_done': self.ctx.loop_count,
# uuids in the output are bad for caching should be avoided,
# instead better return the node.
'last_scf_wc_uuid': self.ctx.scf_res.uuid
}
outnode = Dict(dict=out)
con_nodes = {}
try:
relax_out = self.ctx.scf_res.outputs.last_fleur_calc_output
except NotExistent:
relax_out = None
if relax_out is not None:
con_nodes['last_fleur_calc_output'] = relax_out
if self.ctx.wf_dict.get('run_final_scf', False):
try:
scf_out = self.ctx.scf_final_res.outputs.last_fleur_calc_output
except NotExistent:
scf_out = None
if relax_out is not None:
con_nodes['last_scf__output'] = scf_out
# TODO: for a trajectory output node all corresponding nodes have to go into
# con_nodes
if self.ctx.final_structure is not None:
outdict = create_relax_result_node(out=outnode, optimized_structure=self.ctx.final_structure, **con_nodes)
else:
outdict = create_relax_result_node(out=outnode, **con_nodes)
# return output nodes
for link_name, node in six.iteritems(outdict):
self.out(link_name, node)
if not self.ctx.reached_relax:
return self.exit_codes.ERROR_DID_NOT_RELAX
if self.ctx.switch_bfgs:
return self.exit_codes.ERROR_SWITCH_BFGS
[docs] def control_end_wc(self, errormsg):
"""
Controlled way to shutdown the workchain. It will initialize the output nodes
The shutdown of the workchain will has to be done afterwards.
"""
self.report(errormsg)
self.ctx.errors.append(errormsg)
self.return_results()
[docs]@cf
def create_relax_result_node(**kwargs):
"""
This calcfunction assures the right provenance (additional links)
for ALL result nodes it takes any nodes as input
and return a special set of nodes.
All other inputs will be connected in the DB to these ourput nodes
"""
outdict = {}
for key, val in six.iteritems(kwargs):
if key == 'out': # should always be present
outnode = val.clone() # dublicate node instead of circle (keep DAG)
outnode.label = 'out_relax_wc_para'
outnode.description = ('Contains results and information of an FleurRelaxWorkChain run.')
outdict['out'] = outnode
if key == 'optimized_structure':
structure = val.clone() # dublicate node instead of circle (keep DAG)
structure.label = 'optimized_structure'
structure.description = ('Relaxed structure result of an FleurRelaxWorkChain run.')
outdict['optimized_structure'] = structure
return outdict