Source code for conesToolBox.conesClasses

import os
import sys
import numpy as np
from .conesFunctions import *
from .conesErrors import *
from itertools import cycle

"""
This module provides utility classes for the CONES framework.

:Author: Paolo Errante
:Version: 1.0.0
"""

[docs] class conesParams: """ A Class for Parameters. It has to be developed. """ def __init__(self): """ Initialize the conesParam class """ self.id = None self.type = None self.value = None self.patchName = None
[docs] class conesClip: """ A Class for clippings :Author: Paolo Errante :Version: 1.0.0 """ def __init__(self, name): """ Initialize the conesClip class. :param name: The name of the clipping :type name: str """ self.path = None self.name = name self.verbose = False self.nRanks = None self.ranks = [] self.localCellList = [] self.globalCellList = []
[docs] def get_clip_data(self, globalCellList): """ Reads and sets the clippings cell lists :param globalCellList: List of global cells ids :type globalCellList: list :returns: None """ clipGlobalCellList = [] clipLocalCellList = [] for irank in range(0,self.nRanks): fpath = self.path + "processor"+str(irank) + "/constant/polyMesh/sets/"+self.name localRankCellList = [] globalRankCellList = [] with open(fpath) as file: lines = file.readlines() nCells = int(lines[17].strip()) for icell in range(0,nCells): localCellID= int(lines[19+icell].strip()) globalCellID = local2Global(localCellID, globalCellList, irank) globalRankCellList.append(globalCellID) localRankCellList.append(localCellID) localRankCellList.sort() globalRankCellList.sort() clipLocalCellList.append(localRankCellList) clipGlobalCellList.append(globalRankCellList) if(self.verbose): printCones("Clip:", self.name, " localCellList ", clipLocalCellList) printCones("Clip:", self.name, " globalCellList", clipGlobalCellList) self.localCellList = clipLocalCellList self.globalCellList = clipGlobalCellList return
[docs] def report(self): """ Print to stdout a report of the clipping object """ print("\n================ CLIP REPORT ==================\n") print("Name: ", self.name) print("Path: " , self.path) print("ranks: ", self.ranks) print("local cells ID: " , self.localCellList) print("global cells ID: ", self.globalCellList) print("\n===============================================\n") return
[docs] class conesSettings: """ A class that groups settings for cones """ def __init__(self, casePath): """ Initialize the class :parameter casePath: the path of the source OpenFOAM case :type casePath: str """ self.checkPath(casePath) self.name = "conesSettings" self.verbose = False self.casePath = casePath self.nRanks = self.get_ranks() # Number of openfoam ranks for one simulation self.systemPath = casePath + "system/" self.controlDict = self.systemPath + "controlDict" self.conesDict = self.systemPath + "conesDict" self.obsDict = self.systemPath + "obsDict" self.obsType = self.get_option("obsType", self.obsDict) self.Clips = [] self.mergedClips = [] self.regions = [] self.globalCells = self.setGlobalCells() self.localCells = self.setLocalCells() self.globalStateCells = None self.localStateCells = [] self.stateVar = self.get_option("stateVar", self.conesDict) self.nParams = int(self.get_option("numberParameters", self.conesDict)) self.stateEstSwitch = self.get_option("stateEstSwitch", self.conesDict) == "true" self.paramEstSwitch = self.get_option("paramEstSwitch", self.conesDict) == "true" # This is provisional and not used yet self.params = [] for i in range(0,self.nParams): self.params.append(conesParams()) # We get clips by default self.Clips = self.get_clippings()#self.get_Global_clippings() # We merge those clips that shares cells self.mergedClips = self.mergeClips() self.obsWindow = int(self.get_option("observationWindow", self.conesDict)) self.endTimeSim = float(self.get_option("endTime", self.controlDict)) self.deltaTSim = float(self.get_option("deltaT", self.controlDict)) self.total_it = int(self.endTimeSim / (self.obsWindow * self.deltaTSim))
[docs] def checkPath(self, path): """ Check if path exists :param path: The file directory :type path: str :returns: Boolean :rtype: bool """ printCones("Checking ", path) if not os.path.exists(path): try: raise conesPathNotFound(path) except conesPathNotFound as e: print(e, file=sys.stderr) sys.exit(1) return True
[docs] def get_option(self, keyword, dictionary): """ A function to get options in an OpenFOAM dictionary :param keyword: the name of the option to get :type keyword: str :param dictionary: the name of the dictionary that contains the keyword :type dictionary: str :returns: The value set for the keyword """ # Check file exist try: self.checkPath(dictionary) except conesPathNotFound as e: print(e, file=stderr) if(self.verbose): printCones(" Looking for", keyword, "in", dictionary) with open(dictionary, 'r') as file: lines = file.readlines() for line in lines: #if the line contains the keyword if line.find(keyword) != -1: #if the keyword is an option and not an entry if line.strip().replace(';','').split(" ")[0] == keyword: value = line.strip().replace(';','').split(" ")[-1] if(value[0] == "\""): value = value[1:-1] if(self.verbose): printCones("Found ", keyword, " = ", value) return value
[docs] def mergeClips(self): """ Given a list of conesClip, finds clippings that shares cells and merge them :returns: the list of conesClip after merge :rtype: list """ results = mergeClips(self.Clips, self.globalCells) clipList = [] for i, newClip in enumerate(results): clipList.append(conesClip("newClip"+str(i))) clipGlobalList = [[] for _ in range(self.nRanks)] clipLocalList = [[] for _ in range(self.nRanks)] for cell in newClip: rankCell, localID = global2Local(cell, self.globalCells) clipGlobalList[rankCell].append(cell) clipLocalList[rankCell].append(localID) clipList[i].globalCellList = clipGlobalList clipList[i].localCellList = clipLocalList return clipList
[docs] def clipByRank(self): """ A function that separates clipping by rank :returns: list of clips :rtype: list """ if(self.verbose): printCones("CALLING CLIP BY RANK", self.nRanks) lClips = [] for ir in range(0,self.nRanks): lClips.append([]) rClips = [] for clip in self.localClips: rClips.append(clip.cellList[ir]) rClips = [x for xs in rClips for x in xs] lClips[ir] = rClips if(self.verbose): printCones("Clips by rank") print(lClips) return lClips
[docs] def get_ranks(self): """ A function that get the number of ranks in a OpenFOAM simulation :returns: the number of processors of the original simulation :rtype: int """ dirlist = os.listdir(self.casePath) listProc = sorted([k for k in dirlist if 'processor' in k]) return len(listProc)
[docs] def get_clippings(self): """ Function that reads the clippings in polyMesh/sets/ Returns a list of clippings :returns: list of clippings :rtype: list """ path = self.casePath + "constant/polyMesh/sets/" flist = sorted(os.listdir(path)) clipList = [] for ifile, file in enumerate(flist): clipList.append(conesClip(file)) clipList[ifile].path = self.casePath clipList[ifile].nRanks = self.nRanks clipList[ifile].get_clip_data(self.globalCells) if(self.verbose): printCones("found the following clippings:") if(self.verbose): for ii, clip in enumerate(clipList): printCones(ii, "Global",clip.globalCellList) printCones(ii, "Local", clip.localCellList) return clipList
[docs] def setGlobalStateCells(self): """ Sets a list of global cell ids belonging from which state variables are extracted :returns: None """ stateCellsList = [] for irank in range(0,self.nRanks): rankCellList = [] for region in self.regions: rankCellList.append(region.globalCells[irank]) rankCellList = [cell for cellRank in rankCellList for cell in cellRank] stateCellsList.append(rankCellList) self.globalStateCells = stateCellsList return
[docs] def setLocalStateCells(self): """ Sets a list of local (rank) cell ids belonging from which state variables are extracted :returns: None """ stateCellList = [[] for _ in range(0,self.nRanks)] for irank in range(0,self.nRanks): for cell in self.globalStateCells[irank]: rank, localID = global2Local(cell, self.globalCells) stateCellList[rank].append(localID) self.localStateCells.append(stateCellList[irank]) return
[docs] def setGlobalCells(self): """ Generate a list of global cell ids splitted by ranks :returns: list of global cells ids :rtype: list """ globalCellList = [] for irank in range(0,self.nRanks): cellList = [] fpath = self.casePath + 'processor'+str(irank)+'/constant/polyMesh/cellProcAddressing' with open(fpath) as file: lines = file.readlines() nCells = int(lines[17].strip()) for icell in range(0,nCells): cellList.append(int(lines[19+icell].strip())) globalCellList.append(cellList) if (self.verbose): printCones("Global Cell List :", globalCellList) return globalCellList
[docs] def setLocalCells(self): """ Generate a list of local cells ids splitted by ranks :returns: list of local cells ids :rtype: list """ localCellList = [] for cellList in self.globalCells: localRankCellList = [] for cell in cellList: rank, localID = global2Local(cell, self.globalCells) localRankCellList.append(localID) localCellList.append(localRankCellList) return localCellList
[docs] def report_regions(self): """ Prints to stdout reports of conesRegions """ for region in self.regions: region.report()
[docs] class conesStaticObservation(): def __init__(self): self.type = None self.id = None self.x = np.zeros(3) self.value = None self.localCell = None self.globalCell = None self.ranks = [] self.std = None
[docs] def report(self): print("=========================") print("Observation id:", self.id) print("Coordinates", self.x) print("Value", self.value) print("Std", self.std) print("Rank", self.ranks) print("Local cell ID", self.localCell) print("Global cell ID", self.globalCell) print("=========================")
[docs] class conesRegion(): """ A class to define a Data Assimilation region """ def __init__(self): self.id = None self.ranks = [] self.globalCells = [] self.localCells = [] self.state = None self.stateNvar = None self.params = None self.sampling = None self.obsList = []
[docs] def setRanks(self): """ Set the number of ranks that spans the region :returns: None """ for i, a in enumerate(self.globalCells): if len(a) > 0: self.ranks.append(i) return
[docs] def addObs(self, obs): """ Method to include an observation to the region :param obs: The observation :type obs: conesStaticObservation :returns: None """ self.obsList.append(obs) return
[docs] def setState(self, state): """ Set the state vector for the region :param state: State Matrix :type state: numpy ndarray :returns: None """ self.state = state return
[docs] def splitState(self): splitState = np.split(self.state, self.stateNvar) #print("splitState by var",splitState) splitList = [len(x) for x in self.globalCells] return splitState
[docs] def filterState(self, state, stateCells): regionCells = [x for xs in self.globalCells for x in xs] stateCells = [x for xs in stateCells for x in xs] #printCones("Region", self.id, "cells", regionCells) #printCones("Region ", self.id, "state cells", stateCells) idx = [] for iState, cellState in enumerate(stateCells): if cellState in regionCells: idx.append(iState) #printCones("idx", idx) self.state=np.zeros((state.shape[0], len(idx), state[0].shape[1])) for ii, var in enumerate(state): self.state[ii] = var[idx] self.stateNvar = ii+1 self.state = np.vstack(self.state) return
[docs] def filterSampling(self, sampling): obsIDs = [] for obs in self.obsList: obsIDs.append(obs.id) self.sampling = sampling[obsIDs] printCones("Region ", self.id, " sampling", self.sampling) return
[docs] def setParams(self, params): """ Set the parameters associated to the region :param params: array of parameters :type params: numpy ndarray :returns: None """ self.params = params return
[docs] def report(self): """ Prints a report of the region to stdout """ print("========REGION REPORT========") print("Region", self.id) print("Cell list per proc:", [len(x) for x in self.localCells]) print("Observations", len(self.obsList)) print("ranks", self.ranks) print("state", self.state) print("params", self.params) #for obs in self.obsList: # obs.report() print("============================")
[docs] def full_report(self): """ Prints an extended report of the region to stdout """ print("========REGION REPORT========") print("Region", self.id) print("Global Cell list per proc:", len([cell for cells in self.globalCells for cell in cells]), self.globalCells) print("Local Cell list per proc", self.localCells) print("Observations", len(self.obsList)) print("ranks", self.ranks) print("state", self.state) print("params", self.params) #for obs in self.obsList: # obs.report() print("============================")
[docs] class conesEnKF(): """ A class containing all the basic elements to perform the Ensemble Kalman filter analysis phase """ def __init__(self, region): """ :param region: conesRegion where the analysis phase will be performed :type region: conesRegion """ self.region = region self.state = self.region.state self.params = self.region.params self.sampling = self.region.sampling self.nens = self.state.shape[1] self.x = np.vstack((self.state, self.params)) self.X = self.anomaly(self.x) self.S = self.anomaly(self.sampling) self.R = self.setR() self.H = self.setH() self.y = self.setObs() self.K = None self.upx = None self.upState = None self.upParams = None self.nAss = None
[docs] def setState(self, state): """ Set the EnKF state :param state: The state vector :type state: numpy ndarray :return: None """ self.state = state return
[docs] def setParams(self, params): """ Set the EnKF parameters :param params: The parameters vector :type params: numpy ndarray :returns: None """ self.params = params return
[docs] def setSampling(self, sampling): """ Set the EnKF sampling H(x) :param sampling: The sampling matrix :type sampling: numpy ndarray :returns: None """ self.sampling = sampling return
[docs] def setR(self): """ Set the observation covariance matrix from the conesRegions observations :returns: R :rtype: numpy ndarray """ r = np.empty(self.region.obsList[0].value.shape[0]) for ii, obs in enumerate(self.region.obsList): r = np.vstack((r, obs.std)) r = r[1::] r = r.T r = r.flatten(order='C') r[np.where(r==0)] = 1e-06 # Correcting if std is equal to 0 printCones(f"R = {r}") R = np.diag(r) return R
[docs] def setH(self): """ Returns the EnKF sampling matrix :returns: H :rtype: numpy ndarray """ H = self.sampling return H
[docs] def setObs(self): """ Add noise to the observation vector :returns: y :rtype: numpy ndarray """ y = np.zeros((self.region.obsList[0].value.shape[0]*len(self.region.obsList),self.x.shape[1])) #ydum = np.zeros(len(self.region.obsList)) for ie in range(0,self.x.shape[1]): ydum = np.empty([self.region.obsList[0].value.shape[0]]) for ii, obs in enumerate(self.region.obsList): ydum = np.vstack((ydum,np.random.normal(loc=obs.value, scale=obs.std))) ydum = ydum[1::,:] # ydum.ravel() ydum = ydum.flatten(order='F') printCones(f"ydum = {ydum} for ii = {ii}") y[:,ie] = ydum printCones(f"y = {y}") return y
[docs] def anomaly(self, x): """ Calculate an anomaly matrix :param x: Matrix containing the ensemble members data :type x: numpy ndarray :returns: The anomaly matrix :rtype: numpy ndarray """ xMean = x.mean(axis=1) X = np.zeros_like(x) for i in range(0,x.shape[1]): X[:,i] = (x[:,i] - xMean)/np.sqrt(x.shape[1] - 1) return X
[docs] def KalmanGain(self): """ Checks for matrix inversion and set the Kalman Gain :returns: None """ XST = self.X @ self.S.T SST = self.S @ self.S.T inv = np.linalg.inv(SST + self.R) printCones("Check inversion", (SST+self.R) @ inv) self.K = XST @ inv return
[docs] def update(self): """ Updates the state and parameters matrix :returns: None """ self.upx = self.x + self.K @ (self.y - self.H) return
# Inflation must be reviewed
[docs] def inflate(self, arr ,infl): """ Applies stochastic inflation on a matrix :param arr: the matrix to be inflated :type arr: numpy ndarray :param infl: inflation coefficient :type infl: float :returns: arr :rtype: numpy ndarray """ mean = np.mean(arr, axis=1) std = np.std(arr, axis=1) for ii in range(0, self.nens): arr[:,ii] = np.random.normal(loc=mean, scale=std*infl) return arr
[docs] def inflateState(self, infl): """ Stochastic inflation of the updated state matrix :param infl: The inflation coefficient :type infl: float :returns: None """ self.upState = self.inflate(self.upState, infl) return
[docs] def inflateParams(self, infl): """ Stochastic inflation of the updated parameters matrix :param infl: The inflation coefficient :type infl: float :returns: None """ self.upParams = self.inflate(self.upParams, infl) return
[docs] def updateState(self): """ Sets the updated state matrix :returns: None """ self.upState = self.upx[:self.state.shape[0]] return
[docs] def updateParams(self): """ Sets the updated parameters matrix :returns: None """ self.upParams = self.upx[self.state.shape[0]:] return
[docs] def full_report(self): """ Prints and extended report of the EnKF main features. :returns: None """ print("\n================ ENKF REPORT ==================\n") print("Region: ", self.region.id) print("state " , self.state) print("params: ", self.params) print("sampling: " , self.sampling) print("x", self.x) print("X", self.X) print("S", self.S) print("R", self.R) print("H", self.H, type(self.H)) print("y", self.y, type(self.y)) print("K", self.K) print("upx", self.upx) print("Updated State", self.upState) print("Updated Parameters", self.upParams) print("\n===============================================\n")
[docs] def small_report(self): """ Prints to stdout a smaller report of the EnKF :returns: None """ print("\n================ ENKF REPORT ==================\n") print("Region: ", self.region.id) print("Parameters: ", self.params) print("RMSE", np.linalg.norm(np.abs(self.y - self.sampling))) print("sampling: " , self.sampling) print("y", self.y) print("y-Hx", self.y - self.H) print("Updated Parameters", self.upParams) print("avg = ", np.mean(self.upParams)) print("std = ", np.std(self.upParams)) print("\n===============================================\n")
[docs] def dump_error_convergence(self): """ Dumps in file "err.log" the norm of the difference between observation and model sampling :returns: None """ with open("err.log", "a") as file: dump = str(np.linalg.norm(np.abs(self.y - self.sampling))) + "\n" file.write(dump) return
[docs] def dump_params_convergence(self): """ Dumps in file "par.log" the updated parameter ensemble mean and standard deviation :returns: None """ with open("par.log", "a") as file: dump = str(np.mean(self.upParams)) + "\t" +str(np.std(self.upParams)) + "\n" file.write(dump) return