import os
import sys
import numpy as np
from .conesFunctions import *
from .conesErrors import *
from itertools import cycle
"""
This module provides utility classes for the CONES framework.
:Author: Paolo Errante
:Version: 1.0.0
"""
[docs]
class conesParams:
""" A Class for Parameters.
It has to be developed.
"""
def __init__(self):
"""
Initialize the conesParam class
"""
self.id = None
self.type = None
self.value = None
self.patchName = None
[docs]
class conesClip:
"""
A Class for clippings
:Author: Paolo Errante
:Version: 1.0.0
"""
def __init__(self, name):
"""
Initialize the conesClip class.
:param name: The name of the clipping
:type name: str
"""
self.path = None
self.name = name
self.verbose = False
self.nRanks = None
self.ranks = []
self.localCellList = []
self.globalCellList = []
[docs]
def get_clip_data(self, globalCellList):
"""
Reads and sets the clippings cell lists
:param globalCellList: List of global cells ids
:type globalCellList: list
:returns: None
"""
clipGlobalCellList = []
clipLocalCellList = []
for irank in range(0,self.nRanks):
fpath = self.path + "processor"+str(irank) + "/constant/polyMesh/sets/"+self.name
localRankCellList = []
globalRankCellList = []
with open(fpath) as file:
lines = file.readlines()
nCells = int(lines[17].strip())
for icell in range(0,nCells):
localCellID= int(lines[19+icell].strip())
globalCellID = local2Global(localCellID, globalCellList, irank)
globalRankCellList.append(globalCellID)
localRankCellList.append(localCellID)
localRankCellList.sort()
globalRankCellList.sort()
clipLocalCellList.append(localRankCellList)
clipGlobalCellList.append(globalRankCellList)
if(self.verbose):
printCones("Clip:", self.name, " localCellList ", clipLocalCellList)
printCones("Clip:", self.name, " globalCellList", clipGlobalCellList)
self.localCellList = clipLocalCellList
self.globalCellList = clipGlobalCellList
return
[docs]
def report(self):
"""
Print to stdout a report of the clipping object
"""
print("\n================ CLIP REPORT ==================\n")
print("Name: ", self.name)
print("Path: " , self.path)
print("ranks: ", self.ranks)
print("local cells ID: " , self.localCellList)
print("global cells ID: ", self.globalCellList)
print("\n===============================================\n")
return
[docs]
class conesSettings:
"""
A class that groups settings for cones
"""
def __init__(self, casePath):
""" Initialize the class
:parameter casePath: the path of the source OpenFOAM case
:type casePath: str
"""
self.checkPath(casePath)
self.name = "conesSettings"
self.verbose = False
self.casePath = casePath
self.nRanks = self.get_ranks() # Number of openfoam ranks for one simulation
self.systemPath = casePath + "system/"
self.controlDict = self.systemPath + "controlDict"
self.conesDict = self.systemPath + "conesDict"
self.obsDict = self.systemPath + "obsDict"
self.obsType = self.get_option("obsType", self.obsDict)
self.Clips = []
self.mergedClips = []
self.regions = []
self.globalCells = self.setGlobalCells()
self.localCells = self.setLocalCells()
self.globalStateCells = None
self.localStateCells = []
self.stateVar = self.get_option("stateVar", self.conesDict)
self.nParams = int(self.get_option("numberParameters", self.conesDict))
self.stateEstSwitch = self.get_option("stateEstSwitch", self.conesDict) == "true"
self.paramEstSwitch = self.get_option("paramEstSwitch", self.conesDict) == "true"
# This is provisional and not used yet
self.params = []
for i in range(0,self.nParams):
self.params.append(conesParams())
# We get clips by default
self.Clips = self.get_clippings()#self.get_Global_clippings()
# We merge those clips that shares cells
self.mergedClips = self.mergeClips()
self.obsWindow = int(self.get_option("observationWindow", self.conesDict))
self.endTimeSim = float(self.get_option("endTime", self.controlDict))
self.deltaTSim = float(self.get_option("deltaT", self.controlDict))
self.total_it = int(self.endTimeSim / (self.obsWindow * self.deltaTSim))
[docs]
def checkPath(self, path):
"""
Check if path exists
:param path: The file directory
:type path: str
:returns: Boolean
:rtype: bool
"""
printCones("Checking ", path)
if not os.path.exists(path):
try:
raise conesPathNotFound(path)
except conesPathNotFound as e:
print(e, file=sys.stderr)
sys.exit(1)
return True
[docs]
def get_option(self, keyword, dictionary):
""" A function to get options in an OpenFOAM dictionary
:param keyword: the name of the option to get
:type keyword: str
:param dictionary: the name of the dictionary that contains the keyword
:type dictionary: str
:returns: The value set for the keyword
"""
# Check file exist
try:
self.checkPath(dictionary)
except conesPathNotFound as e:
print(e, file=stderr)
if(self.verbose):
printCones(" Looking for", keyword, "in", dictionary)
with open(dictionary, 'r') as file:
lines = file.readlines()
for line in lines:
#if the line contains the keyword
if line.find(keyword) != -1:
#if the keyword is an option and not an entry
if line.strip().replace(';','').split(" ")[0] == keyword:
value = line.strip().replace(';','').split(" ")[-1]
if(value[0] == "\""):
value = value[1:-1]
if(self.verbose):
printCones("Found ", keyword, " = ", value)
return value
[docs]
def mergeClips(self):
"""
Given a list of conesClip, finds clippings that shares cells and merge them
:returns: the list of conesClip after merge
:rtype: list
"""
results = mergeClips(self.Clips, self.globalCells)
clipList = []
for i, newClip in enumerate(results):
clipList.append(conesClip("newClip"+str(i)))
clipGlobalList = [[] for _ in range(self.nRanks)]
clipLocalList = [[] for _ in range(self.nRanks)]
for cell in newClip:
rankCell, localID = global2Local(cell, self.globalCells)
clipGlobalList[rankCell].append(cell)
clipLocalList[rankCell].append(localID)
clipList[i].globalCellList = clipGlobalList
clipList[i].localCellList = clipLocalList
return clipList
[docs]
def clipByRank(self):
""" A function that separates clipping by rank
:returns: list of clips
:rtype: list
"""
if(self.verbose):
printCones("CALLING CLIP BY RANK", self.nRanks)
lClips = []
for ir in range(0,self.nRanks):
lClips.append([])
rClips = []
for clip in self.localClips:
rClips.append(clip.cellList[ir])
rClips = [x for xs in rClips for x in xs]
lClips[ir] = rClips
if(self.verbose):
printCones("Clips by rank")
print(lClips)
return lClips
[docs]
def get_ranks(self):
""" A function that get the number of ranks in a OpenFOAM simulation
:returns: the number of processors of the original simulation
:rtype: int
"""
dirlist = os.listdir(self.casePath)
listProc = sorted([k for k in dirlist if 'processor' in k])
return len(listProc)
[docs]
def get_clippings(self):
"""
Function that reads the clippings in polyMesh/sets/
Returns a list of clippings
:returns: list of clippings
:rtype: list
"""
path = self.casePath + "constant/polyMesh/sets/"
flist = sorted(os.listdir(path))
clipList = []
for ifile, file in enumerate(flist):
clipList.append(conesClip(file))
clipList[ifile].path = self.casePath
clipList[ifile].nRanks = self.nRanks
clipList[ifile].get_clip_data(self.globalCells)
if(self.verbose):
printCones("found the following clippings:")
if(self.verbose):
for ii, clip in enumerate(clipList):
printCones(ii, "Global",clip.globalCellList)
printCones(ii, "Local", clip.localCellList)
return clipList
[docs]
def setGlobalStateCells(self):
"""
Sets a list of global cell ids belonging from which state variables are extracted
:returns: None
"""
stateCellsList = []
for irank in range(0,self.nRanks):
rankCellList = []
for region in self.regions:
rankCellList.append(region.globalCells[irank])
rankCellList = [cell for cellRank in rankCellList for cell in cellRank]
stateCellsList.append(rankCellList)
self.globalStateCells = stateCellsList
return
[docs]
def setLocalStateCells(self):
"""
Sets a list of local (rank) cell ids belonging from which state variables are extracted
:returns: None
"""
stateCellList = [[] for _ in range(0,self.nRanks)]
for irank in range(0,self.nRanks):
for cell in self.globalStateCells[irank]:
rank, localID = global2Local(cell, self.globalCells)
stateCellList[rank].append(localID)
self.localStateCells.append(stateCellList[irank])
return
[docs]
def setGlobalCells(self):
"""
Generate a list of global cell ids splitted by ranks
:returns: list of global cells ids
:rtype: list
"""
globalCellList = []
for irank in range(0,self.nRanks):
cellList = []
fpath = self.casePath + 'processor'+str(irank)+'/constant/polyMesh/cellProcAddressing'
with open(fpath) as file:
lines = file.readlines()
nCells = int(lines[17].strip())
for icell in range(0,nCells):
cellList.append(int(lines[19+icell].strip()))
globalCellList.append(cellList)
if (self.verbose):
printCones("Global Cell List :", globalCellList)
return globalCellList
[docs]
def setLocalCells(self):
"""
Generate a list of local cells ids splitted by ranks
:returns: list of local cells ids
:rtype: list
"""
localCellList = []
for cellList in self.globalCells:
localRankCellList = []
for cell in cellList:
rank, localID = global2Local(cell, self.globalCells)
localRankCellList.append(localID)
localCellList.append(localRankCellList)
return localCellList
[docs]
def report_regions(self):
"""
Prints to stdout reports of conesRegions
"""
for region in self.regions:
region.report()
[docs]
class conesStaticObservation():
def __init__(self):
self.type = None
self.id = None
self.x = np.zeros(3)
self.value = None
self.localCell = None
self.globalCell = None
self.ranks = []
self.std = None
[docs]
def report(self):
print("=========================")
print("Observation id:", self.id)
print("Coordinates", self.x)
print("Value", self.value)
print("Std", self.std)
print("Rank", self.ranks)
print("Local cell ID", self.localCell)
print("Global cell ID", self.globalCell)
print("=========================")
[docs]
class conesRegion():
"""
A class to define a Data Assimilation region
"""
def __init__(self):
self.id = None
self.ranks = []
self.globalCells = []
self.localCells = []
self.state = None
self.stateNvar = None
self.params = None
self.sampling = None
self.obsList = []
[docs]
def setRanks(self):
"""
Set the number of ranks that spans the region
:returns: None
"""
for i, a in enumerate(self.globalCells):
if len(a) > 0:
self.ranks.append(i)
return
[docs]
def addObs(self, obs):
"""
Method to include an observation to the region
:param obs: The observation
:type obs: conesStaticObservation
:returns: None
"""
self.obsList.append(obs)
return
[docs]
def setState(self, state):
"""
Set the state vector for the region
:param state: State Matrix
:type state: numpy ndarray
:returns: None
"""
self.state = state
return
[docs]
def splitState(self):
splitState = np.split(self.state, self.stateNvar)
#print("splitState by var",splitState)
splitList = [len(x) for x in self.globalCells]
return splitState
[docs]
def filterState(self, state, stateCells):
regionCells = [x for xs in self.globalCells for x in xs]
stateCells = [x for xs in stateCells for x in xs]
#printCones("Region", self.id, "cells", regionCells)
#printCones("Region ", self.id, "state cells", stateCells)
idx = []
for iState, cellState in enumerate(stateCells):
if cellState in regionCells:
idx.append(iState)
#printCones("idx", idx)
self.state=np.zeros((state.shape[0], len(idx), state[0].shape[1]))
for ii, var in enumerate(state):
self.state[ii] = var[idx]
self.stateNvar = ii+1
self.state = np.vstack(self.state)
return
[docs]
def filterSampling(self, sampling):
obsIDs = []
for obs in self.obsList:
obsIDs.append(obs.id)
self.sampling = sampling[obsIDs]
printCones("Region ", self.id, " sampling", self.sampling)
return
[docs]
def setParams(self, params):
"""
Set the parameters associated to the region
:param params: array of parameters
:type params: numpy ndarray
:returns: None
"""
self.params = params
return
[docs]
def report(self):
"""
Prints a report of the region to stdout
"""
print("========REGION REPORT========")
print("Region", self.id)
print("Cell list per proc:", [len(x) for x in self.localCells])
print("Observations", len(self.obsList))
print("ranks", self.ranks)
print("state", self.state)
print("params", self.params)
#for obs in self.obsList:
# obs.report()
print("============================")
[docs]
def full_report(self):
"""
Prints an extended report of the region to stdout
"""
print("========REGION REPORT========")
print("Region", self.id)
print("Global Cell list per proc:", len([cell for cells in self.globalCells for cell in cells]), self.globalCells)
print("Local Cell list per proc", self.localCells)
print("Observations", len(self.obsList))
print("ranks", self.ranks)
print("state", self.state)
print("params", self.params)
#for obs in self.obsList:
# obs.report()
print("============================")
[docs]
class conesEnKF():
"""
A class containing all the basic elements to perform the Ensemble Kalman filter analysis phase
"""
def __init__(self, region):
"""
:param region: conesRegion where the analysis phase will be performed
:type region: conesRegion
"""
self.region = region
self.state = self.region.state
self.params = self.region.params
self.sampling = self.region.sampling
self.nens = self.state.shape[1]
self.x = np.vstack((self.state, self.params))
self.X = self.anomaly(self.x)
self.S = self.anomaly(self.sampling)
self.R = self.setR()
self.H = self.setH()
self.y = self.setObs()
self.K = None
self.upx = None
self.upState = None
self.upParams = None
self.nAss = None
[docs]
def setState(self, state):
"""
Set the EnKF state
:param state: The state vector
:type state: numpy ndarray
:return: None
"""
self.state = state
return
[docs]
def setParams(self, params):
"""
Set the EnKF parameters
:param params: The parameters vector
:type params: numpy ndarray
:returns: None
"""
self.params = params
return
[docs]
def setSampling(self, sampling):
"""
Set the EnKF sampling H(x)
:param sampling: The sampling matrix
:type sampling: numpy ndarray
:returns: None
"""
self.sampling = sampling
return
[docs]
def setR(self):
"""
Set the observation covariance matrix from the conesRegions observations
:returns: R
:rtype: numpy ndarray
"""
r = np.empty(self.region.obsList[0].value.shape[0])
for ii, obs in enumerate(self.region.obsList):
r = np.vstack((r, obs.std))
r = r[1::]
r = r.T
r = r.flatten(order='C')
r[np.where(r==0)] = 1e-06 # Correcting if std is equal to 0
printCones(f"R = {r}")
R = np.diag(r)
return R
[docs]
def setH(self):
"""
Returns the EnKF sampling matrix
:returns: H
:rtype: numpy ndarray
"""
H = self.sampling
return H
[docs]
def setObs(self):
"""
Add noise to the observation vector
:returns: y
:rtype: numpy ndarray
"""
y = np.zeros((self.region.obsList[0].value.shape[0]*len(self.region.obsList),self.x.shape[1]))
#ydum = np.zeros(len(self.region.obsList))
for ie in range(0,self.x.shape[1]):
ydum = np.empty([self.region.obsList[0].value.shape[0]])
for ii, obs in enumerate(self.region.obsList):
ydum = np.vstack((ydum,np.random.normal(loc=obs.value, scale=obs.std)))
ydum = ydum[1::,:]
# ydum.ravel()
ydum = ydum.flatten(order='F')
printCones(f"ydum = {ydum} for ii = {ii}")
y[:,ie] = ydum
printCones(f"y = {y}")
return y
[docs]
def anomaly(self, x):
"""
Calculate an anomaly matrix
:param x: Matrix containing the ensemble members data
:type x: numpy ndarray
:returns: The anomaly matrix
:rtype: numpy ndarray
"""
xMean = x.mean(axis=1)
X = np.zeros_like(x)
for i in range(0,x.shape[1]):
X[:,i] = (x[:,i] - xMean)/np.sqrt(x.shape[1] - 1)
return X
[docs]
def KalmanGain(self):
"""
Checks for matrix inversion and set the Kalman Gain
:returns: None
"""
XST = self.X @ self.S.T
SST = self.S @ self.S.T
inv = np.linalg.inv(SST + self.R)
printCones("Check inversion", (SST+self.R) @ inv)
self.K = XST @ inv
return
[docs]
def update(self):
"""
Updates the state and parameters matrix
:returns: None
"""
self.upx = self.x + self.K @ (self.y - self.H)
return
# Inflation must be reviewed
[docs]
def inflate(self, arr ,infl):
"""
Applies stochastic inflation on a matrix
:param arr: the matrix to be inflated
:type arr: numpy ndarray
:param infl: inflation coefficient
:type infl: float
:returns: arr
:rtype: numpy ndarray
"""
mean = np.mean(arr, axis=1)
std = np.std(arr, axis=1)
for ii in range(0, self.nens):
arr[:,ii] = np.random.normal(loc=mean, scale=std*infl)
return arr
[docs]
def inflateState(self, infl):
"""
Stochastic inflation of the updated state matrix
:param infl: The inflation coefficient
:type infl: float
:returns: None
"""
self.upState = self.inflate(self.upState, infl)
return
[docs]
def inflateParams(self, infl):
"""
Stochastic inflation of the updated parameters matrix
:param infl: The inflation coefficient
:type infl: float
:returns: None
"""
self.upParams = self.inflate(self.upParams, infl)
return
[docs]
def updateState(self):
"""
Sets the updated state matrix
:returns: None
"""
self.upState = self.upx[:self.state.shape[0]]
return
[docs]
def updateParams(self):
"""
Sets the updated parameters matrix
:returns: None
"""
self.upParams = self.upx[self.state.shape[0]:]
return
[docs]
def full_report(self):
"""
Prints and extended report of the EnKF main features.
:returns: None
"""
print("\n================ ENKF REPORT ==================\n")
print("Region: ", self.region.id)
print("state " , self.state)
print("params: ", self.params)
print("sampling: " , self.sampling)
print("x", self.x)
print("X", self.X)
print("S", self.S)
print("R", self.R)
print("H", self.H, type(self.H))
print("y", self.y, type(self.y))
print("K", self.K)
print("upx", self.upx)
print("Updated State", self.upState)
print("Updated Parameters", self.upParams)
print("\n===============================================\n")
[docs]
def small_report(self):
"""
Prints to stdout a smaller report of the EnKF
:returns: None
"""
print("\n================ ENKF REPORT ==================\n")
print("Region: ", self.region.id)
print("Parameters: ", self.params)
print("RMSE", np.linalg.norm(np.abs(self.y - self.sampling)))
print("sampling: " , self.sampling)
print("y", self.y)
print("y-Hx", self.y - self.H)
print("Updated Parameters", self.upParams)
print("avg = ", np.mean(self.upParams))
print("std = ", np.std(self.upParams))
print("\n===============================================\n")
[docs]
def dump_error_convergence(self):
"""
Dumps in file "err.log" the norm of the difference between observation and model sampling
:returns: None
"""
with open("err.log", "a") as file:
dump = str(np.linalg.norm(np.abs(self.y - self.sampling))) + "\n"
file.write(dump)
return
[docs]
def dump_params_convergence(self):
"""
Dumps in file "par.log" the updated parameter ensemble mean and standard deviation
:returns: None
"""
with open("par.log", "a") as file:
dump = str(np.mean(self.upParams)) + "\t" +str(np.std(self.upParams)) + "\n"
file.write(dump)
return