逆畳み込み
1. Jacobi法による逆畳み込み
2. Gauss-Seidel法による逆畳み込み
参考文献:「科学計測のための波形データ処理」、南茂夫 編著 (CQ出版社, 1986)

動作確認用
3. numpy.convolveによる畳み込み
4. scipy.signal.deconvolveによる逆畳み込み
5. fftによる逆畳み込み
注意点:
yconv = np.convolve(yraw, ywf, **kwargs) / sum(ywf)
 畳み込み関数を規格化するため、フィルター関数ywfの和で割る必要がある。
 戻り値のリストは、フィルター関数の点数の1/2だけ、前後に拡張される。
  mode = "same"を指定すると、戻り値のリストの範囲は入力関数の範囲に一致させられる。
  逆畳み込みで元のデータに戻すには、拡張部分のデータも必要。
IDec, remainder = scipy.signal.deconvolve(Iconv, ywf) * sum(ywf)
 逆畳み込みはも入力データの質に非常に敏感。以下の条件を満足しないと、ちゃんとした結果が得られない。
・フィルター関数の幅は入力データよりも短い
・フィルター関数の値はある程度の値よりも大きい(0に近い値があるとだめ?)
・入力データの前には、フィルター関数の幅よりも大きいゼロ領域が必要
・入力データにノイズがある場合は、適切に平滑化処理が必要

Download script from .\deconvolution.py


import sys
import os.path
import csv
import re
from math import exp, log
import numpy as np
import scipy.signal
import matplotlib.pyplot as plt


"""
逆畳み込み
1. Jacobi法による逆畳み込み
2. Gauss-Seidel法による逆畳み込み
参考文献:「科学計測のための波形データ処理」、南茂夫 編著 (CQ出版社, 1986)

動作確認用
3. numpy.convolveによる畳み込み
4. scipy.signal.deconvolveによる逆畳み込み
5. fftによる逆畳み込み
注意点:
yconv = np.convolve(yraw, ywf, **kwargs) / sum(ywf)
 畳み込み関数を規格化するため、フィルター関数ywfの和で割る必要がある。
 戻り値のリストは、フィルター関数の点数の1/2だけ、前後に拡張される。
  mode = "same"を指定すると、戻り値のリストの範囲は入力関数の範囲に一致させられる。
  逆畳み込みで元のデータに戻すには、拡張部分のデータも必要。
IDec, remainder = scipy.signal.deconvolve(Iconv, ywf) * sum(ywf)
 逆畳み込みはも入力データの質に非常に敏感。以下の条件を満足しないと、ちゃんとした結果が得られない。
・フィルター関数の幅は入力データよりも短い
・フィルター関数の値はある程度の値よりも大きい(0に近い値があるとだめ?)
・入力データの前には、フィルター関数の幅よりも大きいゼロ領域が必要
・入力データにノイズがある場合は、適切に平滑化処理が必要
"""



#===================
# Global parametrs
#===================
# File
infile = "pes.csv"


# Analysis range
xmin = -4.5 # eV
xmax = 2.0 # eV

# mode = [gs|jacobi|convolve|fft]
mode = 'gs'

# Filter parameters for Gauss function
Wa = 0.121223 # eV
Grange = 2.0

# for update graph
sleeptime = 0.001

# for Jacobi/Gauss-Seidel method
dump = 1.0
nmaxiter = 300
eps = 1.0e-4
nsmooth = 5

zero_correction = 0

# only for 'convolve': convmode = [same|full|]
convmode = 'same'
# smoothmode = [convolve|extend|average]
smoothmode = 'convolve+extend'

# only for 'convolve' and 'fft': 'exnted data' parameters
kzero = 5
klin = 5


#===================
# 起動時引数
#===================
argv = sys.argv
scriptname = argv[0]
def usage():
    print("usage: python {} file mode convmode smoothmode xmin xmax Wa Grange kzero klin".format(scriptname))
    print(" mode = [convolve|fft]")
    print(" convmode = [''|full|same]")
    print(" smoothmode = combination of [none|convolve|extend|average]")
    print(" ex.: python {} pes.csv convolve same convolve+extend -4.5 2.0 0.12 2.0 1 5".format(scriptname))
    print(" ex.: python {} pes.csv fft full convolve+extend -4.5 2.0 0.12 2.0 5 5".format(scriptname))
    print("")
    print("usage: python {} file mode xmin xmax Wa dump nmaxiter eps nsmooth zeroc".format(scriptname))
    print(" mode = [gs|jacobi] gs=Gauss-Seidel method jacobi=Jacobi method")
    print(" zeroc = [0|1] zero correction after a Jacobi/Gauss-Seidel cycle]")
    print(" ex.: python {} pes.csv gs -6.0 2.0 0.12 1.0 300 1.0e-4 5 0".format(scriptname))
    print("")

if len(argv) == 1:
    usage()
    exit()

if len(argv) >= 2:
    infile = argv[1]
if len(argv) >= 3:
    mode = argv[2]

if mode == 'convolve' or mode == 'fft':
    if len(argv) >= 4:
        convmode = argv[3]
    if len(argv) >= 5:
        smoothmode = argv[4]
    if len(argv) >= 6:
        xmin = float(argv[5])
    if len(argv) >= 7:
        xmax = float(argv[6])
    if len(argv) >= 8:
        Wa = float(argv[7])
    if len(argv) >= 9:
        Grange = float(argv[8])
    if len(argv) >= 10:
        kzero = float(argv[9])
    if len(argv) >= 11:
        klin = float(argv[10])
elif mode == 'gs' or mode == 'jacobi':
    convmode = ''
    smoothmode = ''
    if len(argv) >= 4:
        xmin = float(argv[3])
    if len(argv) >= 5:
        xmax = float(argv[4])
    if len(argv) >= 6:
        Wa = float(argv[5])
    if len(argv) >= 7:
        dump = float(argv[6])
    if len(argv) >= 8:
        nmaxiter = int(argv[7])
    if len(argv) >= 9:
        eps = float(argv[8])
    if len(argv) >= 10:
        nsmooth = int(argv[9])
    if len(argv) >= 11:
        zero_correction = int(argv[10])
else:
    print("")
    print("Error: Invalid mode=[{}]".format(mode))
    usage()
    exit()

header, ext = os.path.splitext(infile)
outcsvfile = header + '-deconvoluted.csv'


def savecsv(outfile, header, datalist):
    try:
        print("Write to [{}]".format(outfile))
        f = open(outfile, 'w')
    except:
#    except IOError:
        print("Error: Can not write to [{}]".format(outfile))
    else:
        fout = csv.writer(f, lineterminator='\n')
        fout.writerow(header)
#        fout.writerows(data)
        for i in range(0, len(datalist[0])):
            a = []
            for j in range(len(datalist)):
                a.append(datalist[j][i])
            fout.writerow(a)
        f.close()

def read_csv(infile, delimiter = ',', xmin = None, xmax = None):
    data = []
    try:
        infp = open(infile, "r")
        f = csv.reader(infp, delimiter = delimiter)
        header = next(f)
        for j in range(len(header)):
            data.append([])

        for row in f:
            try:
                E = float(row[0])
                I = float(row[1])
            except:
                continue
            if xmin <= E <= xmax:
                for j in range(len(row)):
                    data[j].append(float(row[j]))
    except:
        print("Error: Can not read [{}]".format(infile))
        exit()
    return header, data

def read_data(infile, xmin = None, xmax = None):
    if '.TXT' in infile.upper():
        header, data = read_csv(infile, '\t', xmin, xmax)
        return header, -np.array(data[0]), np.array(data[2])
    else:
        header, data = read_csv(infile, ',', xmin, xmax)
        return header, np.array(data[0]), np.array(data[1])

def Gaussian(x, x0, whalf):
#A = 1/whalf * sqrt(ln2 / pi)
    A = 0.469718639 / whalf
#a = whalf / sqrt(ln2)
    a = whalf / 0.832554611
    X = (x - x0) / a
    return A * exp(-X*X)

def Hij(xstep, Wa, Grange, i, j):
#    ixG0 = int(Grange * Wa / xstep + 1.0001)
#    if abs(j - i) > ixG0:
#        return 0.0
    return Gaussian((j - i) * xstep, 0.0, Wa)

def make_wf(Wa, Grange, xstep):
    ixG0 = int(Grange * Wa / xstep + 1.0001)
    ixGmax = 2 * ixG0
    nxGmax = ixG0 + 1
    xG0 = ixG0 * xstep

    xG = []
    yG = []
    for i in range(ixGmax+1):
        x = i * xstep
        xG.append(x)
        yG.append(Gaussian(x, xG0, Wa))

    SG = 0.0
    for i in range(len(yG)-1):
        SG += (yG[i] + yG[i+1]) / 2.0 * (xG[i+1] - xG[i])

    for i in range(ixGmax+1):
        yG[i] /= SG

    print(" Range: {} in width".format(Grange * Wa))
    print(" i range: {} - {} at center {}".format(0,ixGmax, xG0))
    print(" ixGmax = ", ixGmax)
    print(" SG = ", SG)

    return xG, yG


def convolve(xraw, yraw, ywf, **kwargs):
    yconv = np.convolve(yraw, ywf, **kwargs) / sum(ywf)
    n_new = len(yconv)
    dn = n_new - len(yraw)
    if dn > 0:
        offset = int(dn / 2)
        xmin = xraw[0]
        xstep = xraw[1] - xmin
        xmin_new = xmin - offset * xstep
        print("convolve: the length of the output data has been changed "
                + "from {} to {}".format(len(yraw), n_new))
        print(" Add offset = {}".format(offset))
        print(" xmin changes: {} => {}".format(xmin, xmin_new))
        x = np.array([xmin_new + i * xstep for i in range(n_new)])
        return x, yconv
    return xraw, yconv

def extend_smooth(x, y, nzero, nlin, xstep = 0.0):
    xmin = x[0]
    xstep = x[1] - x[0]
    xmin_new = x[0] - nzero * xstep
    n_new = int(nzero + 1.0e-5) + len(x)
    nlin = int(nlin + 1.0e-5)
    nzero = int(nzero + 1.0e-5)
    print("extend_smooth:")
    print(" Add {} zeros at top of the data".format(nzero))
    print(" xmin changes: {} => {}".format(xmin, xmin_new))
    print(" Reshape {} input data with a linear filter".format(nlin))

    xx = np.array([xmin_new + i * xstep for i in range(n_new)])
    yy = np.zeros(n_new)
    for i in range(nlin):
        k = i / (nlin - 1)
        yy[i+nzero] = k * y[i]
    for i in range(len(x) - nlin):
        yy[i+nzero+nlin] = y[i+nzero]
    return xx, yy

def SmoothingBySimpleAverage(y, n):
    n2 = int(n / 2);
    ndata = len(y);
    ys = []
    for i in range(0, ndata):
        c = 0;
        ys.append(0.0);
        for k in range(i - n2, i + n2 + 1):
            if k < 0 or k >= ndata:
                continue
            ys[i] += y[k]
            c += 1
        if c > 0:
            ys[i] /= c;
        else:
            ys[i] = y[i]
    return ys;

def SmoothingByPolynomialFit(y, n):
    m = int(n / 2);
    W23 = (4.0 * m * m - 1.0) * (2.0 * m + 3.0) / 3.0;
    w23j = [0.0]*n
    for j in range(-m, m+1):
        w23j[j + m] = (3.0 * m * (m+1.0) - 1.0 - 5.0 * j * j) / W23

    ndata = len(y)
    ys = []
    for i in range(0, ndata):
        c = 0.0;
        ys.append(0.0);
        for j in range(-m, m+1):
            k = i + j
            if k < 0 or k >= ndata:
                continue
            ys[i] += w23j[j+m] * y[k]
            c += w23j[j+m]
        if c > 0:
            ys[i] /= c
        else:
            ys[i] = y[i]
    return ys;


def deconvolute_fft(xRaw, yRaw, xG, yG):
    k = sum(yG)

    print("Deconvolution by FFT")
    n = len(xRaw)
    nlog = int(log(n) / log(2) + 1.0 - 1.0e-5)
    nfft = pow(2, nlog)
    print(" Data number is changed from {} to 2^{} = {} for FFT".format(n, nlog, nfft))

    xmin = xRaw[0]
    xstep = xRaw[1] - xmin
    xRawFFT = [xmin + i * xstep for i in range(nfft)]
    yRawFFT = np.insert(yRaw, len(yRaw), np.zeros(nfft - n))
# filterの中心位置の原点からのずれによって、iFFT後の原点がずれる
    yGFFT = np.insert(yG, len(yG), np.zeros(nfft - len(yG)))
    xminG = xmin + len(xG) / 2 * xstep
    print(" xmin: ", xmin, xminG)
    xGFFT = [xminG + i * xstep for i in range(nfft)]
#    nadd = int((nfft - len(yG)) / 2)
#    yGFFT = np.insert(yG, len(yG), np.zeros(nadd))
#    yGFFT = np.insert(yGFFT, 0, np.zeros(nfft - len(yGFFT)))

    yRawFFTed = np.fft.fft(yRawFFT)
    yGFFTed = np.fft.fft(yGFFT)
    ycFFTed = yRawFFTed / yGFFTed
    ydeconv = np.fft.ifft(ycFFTed)
    ydeconv = [float(ydeconv[i]) for i in range(len(ydeconv))]

    print("")

    return xGFFT, ydeconv, xRawFFT, yRawFFT, xRawFFT, yGFFT

def deconvolute_deconvolve(xRaw, yRaw, xG, yG):
    k = sum(yG)

    print("Deconvolution by scipy.signal.deconvove")
    print("")
    IDec, remainder = scipy.signal.deconvolve(yRaw, yG)
    IDec *= k
    ndata = len(xRaw)
    nGhalf = int(len(xG) / 2)

    return xRaw[nGhalf:ndata-nGhalf], IDec

def deconvolute_jacobi(xRaw, yRaw, xG, yG, fig, ax):
    global Wa, Grange

    k = sum(yG)

    print("Deconvolution by Jacobi method")
    print("")

    xstep = xRaw[1] - xRaw[0]

    xgmin = min(xRaw)
    xgmax = max(xRaw)

    n = len(xRaw)
    Sg = np.zeros(n)
    for i in range(n):
        for j in range(n):
            Sg[i] += Hij(xstep, Wa, Grange, i, j)
    print("Filter area w.r.t. i: Sg=", Sg[int(n/2)])

    ymax = max([abs(yRaw[i]) for i in range(n)])
    y = yRaw.copy()
    yPrev = yRaw.copy()

    for it in range(nmaxiter):
        Hx = np.zeros(n)
        print("iter=", it)

        for i in range(n):
            for j in range(n):
                Hx[i] += Hij(xstep, Wa, Grange, i, j) * yPrev[j]
            h = Hij(xstep, Wa, Grange, i, i)
            y[i] = yPrev[i] + (yRaw[i] - Hx[i] / Sg[i]) / h * dump

#        y = SmoothingBySimpleAverage(y, nsmooth)
        y = SmoothingByPolynomialFit(y, nsmooth)
        if zero_correction:
            for i in range(n):
                if y[i] < 0.0:
                    y[i] = 0.0

        ax[0].cla()

        data1 = ax[0].plot(xRaw, yRaw, label = 'raw/initial')
        data1 = ax[0].plot(xRaw, y, label = 'updated')
#        data4 = ax[2].plot(xG, yG, label = 'filter')
        ax[0].set_xlim([xgmin, xgmax])
        ygmax = max([max(xRaw), max(y)])
        ax[0].set_ylim([0.0, ygmax])
#        ax[1].set_xlim([xgmin, xgmax])
#        ax[1].set_ylim([0.0, max(yRaw)])
#        ax[2].set_xlim([xgmin, xgmax])
#        ax[2].set_ylim([0.0, max(yG)])

        ax[0].legend()
#        ax[2].legend()
        plt.tight_layout()
        plt.pause(sleeptime)

        max_err = max([abs(y[i] - yPrev[i]) for i in range(n)])
        rel_err = max_err / ymax
        print(" max error: ", max_err, " relative error: ", rel_err, " eps=", eps)
        if max_err / ymax < eps:
            print("Converged at max_err={} ({} relative) < {}".format(max_err, rel_err, eps))
            break

        yPrev = y.copy()
    else:
        print("Not converged")

    return xRaw, y

def deconvolute_gauss_seidel(xRaw, yRaw, xG, yG, fig, ax):
    global Wa, Grange

    k = sum(yG)

    print("Deconvolution by Jacobi method")
    print("")

    xstep = xRaw[1] - xRaw[0]

    xgmin = min(xRaw)
    xgmax = max(xRaw)

    n = len(xRaw)
    Sg = np.zeros(n)
    for i in range(n):
        for j in range(n):
            Sg[i] += Hij(xstep, Wa, Grange, i, j)
    print("Filter area w.r.t. i: Sg=", Sg[int(n/2)])

    ymax = max([abs(yRaw[i]) for i in range(n)])
    y = yRaw.copy()
    yPrev = yRaw.copy()

    for it in range(nmaxiter):
        Hx = np.zeros(n)
        print("iter=", it)

        for i in range(n):
            for j in range(i):
                Hx[i] += Hij(xstep, Wa, Grange, i, j) * y[j]
            for j in range(i, n):
                Hx[i] += Hij(xstep, Wa, Grange, i, j) * yPrev[j]
            h = Hij(xstep, Wa, Grange, i, i)
            y[i] = yPrev[i] + (yRaw[i] - Hx[i] / Sg[i]) / h * dump

#        y = SmoothingBySimpleAverage(y, nsmooth)
        y = SmoothingByPolynomialFit(y, nsmooth)
        if zero_correction:
            for i in range(n):
                if y[i] < 0.0:
                    y[i] = 0.0

        ax[0].cla()

        data1 = ax[0].plot(xRaw, yRaw, label = 'raw/initial')
        data1 = ax[0].plot(xRaw, y, label = 'updated')
#        data4 = ax[2].plot(xG, yG, label = 'filter')
        ax[0].set_xlim([xgmin, xgmax])
        ygmax = max([max(xRaw), max(y)])
        ax[0].set_ylim([0.0, ygmax])
#        ax[1].set_xlim([xgmin, xgmax])
#        ax[1].set_ylim([0.0, max(yRaw)])
#        ax[2].set_xlim([xgmin, xgmax])
#        ax[2].set_ylim([0.0, max(yG)])

        ax[0].legend()
#        ax[2].legend()
        plt.tight_layout()
        plt.pause(sleeptime)

        max_err = max([abs(y[i] - yPrev[i]) for i in range(n)])
        rel_err = max_err / ymax
        print(" max error: ", max_err, " relative error: ", rel_err, " eps=", eps)
        if max_err / ymax < eps:
            print("Converged at max_err={} ({} relative) < {}".format(max_err, rel_err, eps))
            break

        yPrev = y.copy()
    else:
        print("Not converged")

    return xRaw, y


#======================
# main
#======================
def main():
    print("infile : ", infile)
    print("outfile : ", outcsvfile)
    print("mode : ", mode)
    if mode == 'convolve' or mode == 'fft':
        print("For mode = 'convolve' or 'fft':")
        print(" convmode: ", convmode)
        print(" x range : ", xmin, xmax)
    if mode == 'gs' or mode == 'jacobi':
        print("For mode = 'gs' or 'jacobi':")
        print(" dump=", dump)
        print(" nmaxiter=", nmaxiter)
        print(" eps=", eps)
        print(" nsmooth=", nsmooth)
        print(" zero correction=", zero_correction)
    print("")

    header, xin, yin = read_data(infile, xmin, xmax)
    nindata = len(xin)
    xinmin = xin[0]
    xinstep = xin[1] - xin[0]
    xinmax = xinmin + xinstep * (nindata - 1)
    print("Input data")
    print(" ndata = ", nindata)
    print(" x range: {} - {} at {} step".format(xinmin, xinmax, xinstep))
    print("Smooth mode: ", smoothmode)
    print("")

    xRaw = xin
    yRaw = yin
    xstep = xinstep
    print("Filter: Wa = {} Grange = {}".format(Wa, Grange))
    xG, yG = make_wf(Wa, Grange, xstep)
    print("")


    fig = plt.figure(figsize = (15,7))
    ax = []
    ax.append(fig.add_subplot(3, 1, 1))
    ax.append(fig.add_subplot(3, 1, 2))
    ax.append(fig.add_subplot(3, 1, 3))

# make data to be deconvolved
    nw = int(len(xG) / 2)
    if 'average' in smoothmode:
        yRaw = SmoothingBySimpleAverage(yRaw, 31)
    if 'extend' in smoothmode:
        xRaw, yRaw = extend_smooth(xRaw, yRaw, nw*kzero, nw*klin, xstep)
    if 'convolve' in smoothmode:
        xconv, yconv = convolve(xRaw, yRaw, yG, mode = convmode)
    else:
        xconv, yconv = xRaw, yRaw

# deconvolution
    if mode == 'fft':
        xDec, yDec, xRawFFT, yRawFFT, xGFFT, yGFFT = deconvolute_fft(xconv, yconv, xG, yG)
        xconv = xRawFFT
        yconv = yRawFFT
        datafft = ax[2].plot(xGFFT, yGFFT, label = 'WF for FFT')
    elif mode == 'jacobi':
        xDec, yDec = deconvolute_jacobi(xconv, yconv, xG, yG, fig, ax)
        yG = [Gaussian(xRaw[i], 0.0, Wa) for i in range(len(xRaw))]
        datafft = ax[2].plot(xRaw, yG, label = 'WF for Jacobi method')
    elif mode == 'gs':
        xDec, yDec = deconvolute_gauss_seidel(xconv, yconv, xG, yG, fig, ax)
        yG = [Gaussian(xRaw[i], 0.0, Wa) for i in range(len(xRaw))]
        datafft = ax[2].plot(xRaw, yG, label = 'WF for Gauss-Seidel method')
    else:
        xDec, yDec = deconvolute_deconvolve(xconv, yconv, xG, yG)
        datafft = ax[2].plot(xG, yG, label = 'WF for convolve')

    xgmin = min([min(xconv), min(xDec)])
    xgmax = max([max(xconv), max(xDec)])

    ax[0].cla()
    data1 = ax[0].plot(xconv, yconv, label = 'input(convoluted)')
    data3 = ax[0].plot(xDec, yDec, label = 'deconvoluted')
    data1 = ax[1].plot(xRaw, yRaw, label = 'input(raw)')
    data2 = ax[1].plot(xconv, yconv, label = 'input(convoluted)')
    ax[0].set_xlim([xgmin, xgmax])
    ax[0].set_ylim([0.0, max([max(yRaw), max(yDec)])])
    ax[1].set_xlim([xgmin, xgmax])
    ax[1].set_ylim([0.0, max(yRaw)])
    ax[2].set_xlim([xgmin, xgmax])
    ax[2].set_ylim([0.0, max(yG)])
#    ax[1].set_ylim([0.0, max(yRaw)])

    ax[0].legend()
    ax[1].legend()
    ax[2].legend()
    plt.tight_layout()

    plt.pause(0.001)

    print("")
    print("Save deconvoluted data to [{}]".format(outcsvfile))
    savecsv(outcsvfile, ['x', 'y(input)', 'y(deconvoluted)'], [xconv, yconv, yDec])

    print("Press ENTER to exit>>", end = '')
    input()


if __name__ == '__main__':
    usage()
    main()