Epidemic modeling - Part 7

Convolution, deconvolution, and daily data
modeling
SEIR
epidemiology
stochastic
COVID-19
Author

Jeffrey Post

Published

April 3, 2020

Code
import pandas as pd
import numpy as np

import math

from scipy import signal

import plotly.graph_objects as go
import plotly.express as px

from scipy.stats import expon
from scipy.stats import gamma
from scipy.stats import weibull_min

from numpy.random import default_rng
rng = default_rng()
import plotly
import plotly.io as pio
from IPython.display import display, HTML

## Tomas Mazak's workaround
plotly.offline.init_notebook_mode()
display(HTML(
    #'<script type="text/javascript" async src="https://cdnjs.cloudflare.com/ajax/libs/mathjax/2.7.1/MathJax.js?config=TeX-MML-AM_SVG"></script>'
    '<script type="text/javascript" async src="https://cdnjs.cloudflare.com/ajax/libs/mathjax/2.7.5/MathJax.js?config=TeX-MML-AM_SVG"></script>'
    '<script src="https://cdn.plot.ly/plotly-3.0.1.js" charset="utf-8"></script>'
))

pio.renderers.default = "plotly_mimetype+notebook_connected"

pio.templates.default = "plotly_dark"


# Let's build a numerical solution
def seir_model(init, parms, days):
    S_0, E_0, I_0, R_0 = init
    Epd, Ipd, Rpd = [0], [0], [0]
    S, E, I, R = [S_0], [E_0], [I_0], [R_0]
    dt=0.1
    t = np.linspace(0,days,int(days/dt))
    sigma, beta, gam = parms
    for _ in t[1:]:
        next_S = S[-1] - beta*S[-1]*I[-1]*dt
        Epd.append(beta*S[-1]*I[-1]*dt)
        next_E = E[-1] + (beta*S[-1]*I[-1] - sigma*E[-1])*dt
        Ipd.append(sigma*E[-1]*dt)
        next_I = I[-1] + (sigma*E[-1] - gam*I[-1])*dt
        Rpd.append(gam*I[-1]*dt)
        next_R = R[-1] + (gam*I[-1])*dt
        S.append(next_S)
        E.append(next_E)
        I.append(next_I)
        R.append(next_R)
    return np.stack([S, E, I, R, Epd, Ipd, Rpd]).T
Code
# Need this new function for model below:
def make_df(p,num_E, num_I, num_R):
  df = pd.DataFrame(np.full((p,1), 'S').T[0], columns=['State'])
  df['Day'] = 0
  tochange=df.loc[rng.choice(p, size=num_E+num_I+num_R, replace=False),'State'].index
  df.loc[tochange[0:num_E],'State'] = 'E'
  df.loc[tochange[num_E:num_I+num_E],'State'] = 'I'
  df.loc[tochange[num_E+num_I:num_E+num_I+num_R],'State'] = 'R'
  return df
Code
def seir_model_stoch(beta, p, num_E, num_I, num_R, days):

    # Initialize population dataframe with data given by user
    df = make_df(p,num_E, num_I, num_R)
    
    # This variable is used to track daily value of beta if it varies over time
    xxbeta=np.array([],dtype=float)

    # Initialize the arrays to return
    # Below are numbers of S, E, I, R total
    S=np.array([],dtype=int)
    E=np.array([],dtype=int)
    I=np.array([],dtype=int)
    R=np.array([],dtype=int)
    # Below are the daily additions in S, E, I, R
    Spd=np.array([],dtype=int)
    Epd=np.array([],dtype=int)
    Ipd=np.array([],dtype=int)
    Rpd=np.array([],dtype=int)

    b=beta
    
    # Stochastic model so use random values to decide on progression
    rand = np.random.random(size=(p,days))

    # Depending if you want exponential or gamma distribution for T_Latent
    EtoI = gamma.rvs(1.8,loc=0.9,scale=(5.2-1.8)/0.9,size=p)

    # Depending if you want exponential, gamma, or Weibull distribution for T_Infectious
    # Uses distributions found on blog part 3
    ItoR = weibull_min.rvs(2.3, loc=2, scale=20.11, size=p)

    # Iterate over every day the simulation is run
    for j in range(0,days-1):

        # Record daily beta values
        xxbeta=np.append(beta, b)

        # First we get the index of the individuals that will change state today:

        # Random number tells you which 'S' have been exposed on this day 
        StoE_index = df.loc[(df.State == 'S') & (rand[:,j] < b[j]*len(np.where(df.State=='I')[0])/p)].index

        # For each row, if a person has been a certain number of days in E, they will go to I
        # This follows EtoI variable which is either exponential or gamma distributed according to above
        EtoI_index = df.loc[(df.State == 'E') & (j-df.Day >= EtoI)].index
        
        # Similaraly as above
        # For each row, if a person has been a certain number of days in I, they will go to R
        # This follows EtoI variable which is either exponential or gamma distributed according to above
        ItoR_index = df.loc[(df.State == 'I') & (j-df.Day >= ItoR)].index

        # Use indexes collected above to populate per day values
        Epd = np.append(Epd,len(StoE_index))
        Ipd = np.append(Ipd,len(EtoI_index))
        Rpd = np.append(Rpd,len(ItoR_index))

        # Now we use the indexes collected above randomly to change the actual population dataframe to the new states
        df.loc[ItoR_index, 'State'] = 'R'
        df.loc[EtoI_index, 'State'] = 'I'
        df.loc[StoE_index, 'State'] = 'E'
        df.loc[ItoR_index, 'Day'] = j
        df.loc[EtoI_index, 'Day'] = j
        df.loc[StoE_index, 'Day'] = j
        df.loc[ItoR_index, 'DayR'] = j
        df.loc[EtoI_index, 'DayI'] = j
        df.loc[StoE_index, 'DayE'] = j
        
        # Append the S, E, I, and R arrays
        S=np.append(S,len(np.where(df.State=='S')[0]))
        E=np.append(E,len(np.where(df.State=='E')[0]))
        I=np.append(I,len(np.where(df.State=='I')[0]))
        R=np.append(R,len(np.where(df.State=='R')[0]))

        # Code below for control measures to reduce beta values
#        if ((I[-1] > 1000) & (Ipd[-1] > 399)): 
#            b = beta2
#        elif ((I[-1] > 1000) & (Ipd[-1] < 400)): 
#            b = beta3
                
    Epd[0]+=num_E
    Ipd[0]+=num_I
    Rpd[0]+=num_R

    return S,E,I,R, Epd, Ipd, Rpd, xxbeta, df

Motivation for write-up

This is the 7th part of a multi-part series blog post on modeling in epidemiology.

The goal of this 7th installment is to expand on the notions seen in part 5.

Most notably, the idea is to expand on the notion of convolution and deconvolution and to see how it can be useful to describe an epidemic.

Available data

If you look at the the COVID-19 trackers around the web, or even mine, you can get a sense of what data is available for study.

Generally speaking we have the following:

  • Total number of positive individuals
  • Daily number of newly diagnosed individuals
  • Daily number of recovered individuals
  • Daily number of deaths

Research has also shown some vague numbers on both \(T_{Latent}\) and \(T_{Infectious}\).

About the data:

  • We don’t have data for exposure - how can we get it using deconvolution ??
  • The data for some regions can be very sparse

Convolution

Convolution to get \(I_{pd}\)

We have seen that daily new infectious individuals is given by:

\[I_{pd}[j] = \sum_{n_L=0}^{M_L-1}h_L[n_L]~E_{pd}[j-n_L]= h_L[j]\circledast E_{pd}[j]\]

Where \(h_L[j]\) describes the distribution of \(T_{Latent}\).

Convolution to get \(R_{pd}\)

Similarly, we have seen that daily new recovered individuals (deaths + recoveries in fact) is given by:

\[R_{pd}[j] = \sum_{n_I=0}^{M_I-1}h_I[n_I]~I_{pd}[j-n_I]= h_I[j]\circledast I_{pd}[j]\]

Where \(h_I[j]\) describes the distribution of \(T_{Infectious}\).

Finding \(h_L[j]\) and \(h_I[j]\)

  • \(h_L[j]\) is simply the probability of an individual having a latent period of j days
  • \(h_I[j]\) is similarly the probability of an individual having an infectious period of j days
Code
days = np.arange(100)
cdf = pd.DataFrame({
    'T_Latent': gamma.cdf(days, 1.8,loc=0.9,scale=(5.2-1.8)/0.9), 
    'T_Infectious': weibull_min.cdf(days, 2.3,loc=2,scale=20.11)
    })
h_L = cdf.diff().T_Latent
h_I = cdf.diff().T_Infectious
h_L[0] = 0
h_I[0] = 0

Convolution in practice

Comparing \(I_{pd}\) from the model to \(I_{pd}\) obtained by convolving \(R_{pd}\)

First run the SEIR model to obtain the actual \(I_{pd}\):

Code
# Define parameters for stochastic model
days = 300
p = 10000
num_E = 1
num_I = 0
num_R = 0
beta_stoch = 0.5*np.ones(days)

# Run 2 stochastic simulations
results_stoch0 = seir_model_stoch(beta_stoch, p, num_E, num_I, num_R, days)

Now obtain \(h_L[j]\circledast E_{pd}[j]\):

Code
Ipd=signal.fftconvolve(h_L, results_stoch0[4], mode='full')
Code
fig = go.Figure(data=[    
    go.Scatter(name='Ipd_Actual', x=np.arange(len(results_stoch0[5])), y=results_stoch0[5]),
    go.Scatter(name='Ipd_convolved', x=np.arange(len(Ipd)), y=Ipd)
])

fig.update_layout(
    xaxis_title = 'Day',
    yaxis_title = 'Count',
    title={
        'text':r'$\text{Actual } I_{pd} \text{ vs. } h_L[j]\circledast E_{pd}[j]$',
        'x':0.5,
        'xanchor':'center'
    }
)

fig.show()

Comparing \(R_{pd}\) from the model to \(R_{pd}\) obtained by convolving \(I_{pd}\)

We can use the actual \(R_{pd}\) from the SEIR model above.

Now obtain \(h_I[j]\circledast I_{pd}[j]\):

Code
Rpd=signal.fftconvolve(h_I, results_stoch0[5], mode='full')
Code
fig = go.Figure(data=[    
    go.Scatter(name='Rpd_Actual', x=np.arange(len(results_stoch0[6])), y=results_stoch0[6]),
    go.Scatter(name='Rpd_convolved', x=np.arange(len(Rpd)), y=Rpd)
])

fig.update_layout(
    xaxis_title = 'Day',
    yaxis_title = 'Count',
    title={
        'text':r'$\text{Actual } R_{pd} \text{ vs. } h_I[j]\circledast I_{pd}[j]$',
        'x':0.5,
        'xanchor':'center'
    }
)

fig.show()

De-convolution

We get very close matches when convlving, can we obtain similar results by deconvolving?

Problems with de-convolution

Illl-posed

Unfortunately de-convolution is not as straightforward as the convolution.

A quick recap of the math, in the frequency domain we have (Fourier or z transform)

\[F \{ f ∗ g \} = F \{ f \} ~ F \{ g \}\]

And so if we have:

\[y=\ f ∗ g\]

Then:

\[F \{ y \} = F \{ f \} ~ F \{ g \}\]

And:

\[g = F^{-1} \left(\frac{F\{y\}}{F\{f\}} \right) \]

But this last equation is often ill-posed.

Noise in data

We can see above that daily new data varies a lot from day to day due to the stochasticity of the model. This results in high-frequency decomposition in the frequency domain.

Initial conditions and boundary-value problems

Boundary values refer to the values on day 0 and the last day we have data for. Due to the way data is reported on JHU, we someimes have crayz values here, most commonly a value of 0 for the latest day if the data isn0t reported on time for example. These gaps can create issues for deconvolution.

In the next blog post I go around this by simplz averaging the last 3 days instead of using only the last day.

Solutions to these problems

Iterative deconvoluton

Multiple iterative deconvolution algorithms exist: Lucy_Richardson, Gold, or Van Cittert among others.

LR has already been used to get daily incidence of the Spanish flu based on death records in Philadelphia at the time for example 1.

An adaptation of those above is used here with the basics below (see code for details):

Let’s use the equation for \(I_{pd}\) as an example:

\[I_{pd}[j] = \sum_{n_L=0}^{M_L-1}h_L[n_L]~E_{pd}[j-n_L]= h_L[j]\circledast E_{pd}[j]\]

The idea is that after an initial guess for \(E_{pd}\) we can iteraively find a better one.

With the \(n^{th}\) guess for \(E_{pd}\) written as \(E_{pd, n}\) we have:

\[I_{pd}[j]= h_L[j]\circledast E_{pd, n}[j]\] \[\leftrightarrow 0 = I_{pd}[j] - h_L[j]\circledast E_{pd, n}[j]\] \[\leftrightarrow E_{pd,n+1} = E_{pd,n} + I_{pd}[j] - h_L[j]\circledast E_{pd, n}[j]\]

Where \(O[j] = I_{pd}[j] - h_L[j]\circledast E_{pd, n}[j]\) is the error term.

Hence we want to minimze O[j].

Initial guess

We know our \(h_L\) and \(h_I\) are in part simply delay functions, so an initial first guess is to simply use the same signal delayed by the time difference caused by the impulse response.

Low-pass filter

To go around those high-frequency notes in the data, we can siply use a lowpass filter implemented as a butterworth filter below:

The basic effect is that it smoothes the daily values as we have when convolving from the previous state daily values.

Code
def lowpass(x, fc=0.05):
  fs = 1  # Sampling frequency
  t = np.arange(len(x)) #select number of days done in SEIR model
  signala = x

  #fc = 0.05  # Cut-off frequency of the filter
  w = fc / (fs / 2) # Normalize the frequency
  b, a = signal.butter(5, w, 'low')
  return signal.filtfilt(b, a, signala)

Deconvolution in practice

Coding the iterative deconvolution

Code
# Let's define an iteration function:
def iter_deconv(alpha, impulse_response, input_signal, delay, comparator):
  conv=signal.fftconvolve(impulse_response, input_signal, mode='full')
  correction=np.roll(comparator-conv[:len(comparator)], delay)
  input_signal=np.floor(lowpass(alpha*correction+input_signal))
  input_signal[input_signal<0]=0
  return input_signal

# Define a function to return MSE between two signals as a measure of goodness of fit
def msecalc(A, B):
  return ((A - B)**2).mean(axis=0)

Comparing \(E_{pd}\) from the model to \(E_{pd}\) obtained by de-convolving \(I_{pd}\)

Code
#regularization parameter
alpha=2

# Setup up the resultant Ipd we want to compare our guess with
Ipd=np.floor(lowpass(results_stoch0[5]))
Ipd[Ipd<0]=0

# Find delay caused by h_L
delay=Ipd.argmax()-signal.fftconvolve(Ipd, h_L, mode='full').argmax()

# We want initial guess to simply be the result of the convolution delayed
initial_guess=np.roll(Ipd,delay)
Enext = initial_guess

# AN array to record MSE between result we want and our iterated guess
mse=np.array([])
mse=np.append(mse, 100000)
mse=np.append(mse, msecalc(Ipd, signal.fftconvolve(h_L, Enext, mode='full')[:len(Ipd)]))

itercount=0
while mse[-1] < mse[-2]:
  itercount=itercount+1
  Enext=iter_deconv(alpha, h_L, Enext, delay, Ipd)
  mse=np.append(mse, msecalc(Ipd, signal.fftconvolve(h_L, Enext, mode='full')[:len(Ipd)]))
  print("Iteration #" + str(itercount) +": MSE= "+str(mse[itercount]))
print("Iteration #" + str(itercount+1) +": MSE= "+str(mse[-1])+" so we use the result of the previous iteration.")
Iteration #1: MSE= 130.07201423939554
Iteration #2: MSE= 28.985923141326676
Iteration #3: MSE= 21.14637589704021
Iteration #4: MSE= 15.712400523386
Iteration #5: MSE= 12.376728037642959
Iteration #6: MSE= 10.371422096439822
Iteration #7: MSE= 7.807135996773338
Iteration #8: MSE= 7.115442105152689
Iteration #9: MSE= 5.243210227637416
Iteration #10: MSE= 4.887916535313005
Iteration #11: MSE= 3.789243188684947
Iteration #12: MSE= 3.6143739648770974
Iteration #13: MSE= 2.857605866687358
Iteration #14: MSE= 2.8116375311783885
Iteration #15: MSE= 2.376734514666326
Iteration #16: MSE= 2.423265713368301 so we use the result of the previous iteration.
Code
fig = go.Figure(data=[    
    go.Scatter(name='E_pd', x=np.arange(150), y=results_stoch0[4]),
    go.Scatter(name='Epd=lowpass(E_pd)', x=np.arange(150), y=lowpass(results_stoch0[4])),
    go.Scatter(name='Epd=deconv(I_pd)', x=np.arange(150), y=Enext),
    go.Scatter(name='I_pd', x=np.arange(150), y=results_stoch0[5]),
    go.Scatter(name='Ipd=lowpass(I_pd)', x=np.arange(150), y=lowpass(results_stoch0[5])),
    go.Scatter(name='Ipd=conv(E_pd)', x=np.arange(150), y=signal.fftconvolve(h_L, lowpass(results_stoch0[4]), mode='full')),
    go.Scatter(name='Ipd=conv(deconv(Ipd))', x=np.arange(150), y=signal.fftconvolve(h_L, Enext, mode='full')[:len(Ipd)])
])

fig.update_layout(
    
    xaxis_title = 'Day',
    yaxis_title = 'Count',
    title={
        'text':r'$\text{Actual } E_{pd} \text{ vs. deconvolution of } I_{pd}$',
        'x':0.5,
        'xanchor':'center'
    }
)

fig.show()

The iterative deconvolution seems to work nicely and we get close results.

Comparing \(I_{pd}\) from the model to \(I_{pd}\) obtained by de-convolving \(R_{pd}\)

Let’s try the same thing but using \(R_{pd}\) to get \(I_{pd}\):

Code
#regularization parameter
alpha=2

# Setup up the resultant Ipd we want to compare our guess with
Rpd=np.floor(lowpass(results_stoch0[6]))
Rpd[Rpd<0]=0

# Find delay caused by h_I
delay=Rpd.argmax()-signal.fftconvolve(Rpd, h_I, mode='full').argmax()

# We want initial guess to simply be the result of the convolution delayed
initial_guess=np.roll(Rpd,delay)
Inext = initial_guess


# AN array to record MSE between result we want and our iterated guess
mse=np.array([])
mse=np.append(mse, 100000)
mse=np.append(mse, msecalc(Rpd, signal.fftconvolve(h_I, Inext, mode='full')[:len(Rpd)]))

itercount=0
while mse[-1] < mse[-2]:
  itercount=itercount+1
  Inext=iter_deconv(alpha, h_I, Inext, delay, Rpd)
  mse=np.append(mse, msecalc(Rpd, signal.fftconvolve(h_I, Inext, mode='full')[:len(Rpd)]))
  print("Iteration #" + str(itercount) +": MSE= "+str(mse[itercount]))
print("Iteration #" + str(itercount+1) +": MSE= "+str(mse[-1])+" so we use the result of the previous iteration.")
Iteration #1: MSE= 183.9549261264147
Iteration #2: MSE= 36.4813973184298
Iteration #3: MSE= 18.45240430779509
Iteration #4: MSE= 11.032242359194717
Iteration #5: MSE= 8.923820246583984
Iteration #6: MSE= 6.676254645792155
Iteration #7: MSE= 5.615808839920469
Iteration #8: MSE= 4.308264204337396
Iteration #9: MSE= 3.682767695460665
Iteration #10: MSE= 2.8722567678200193
Iteration #11: MSE= 2.5674870609908087
Iteration #12: MSE= 2.147294049205842
Iteration #13: MSE= 2.01828193955411
Iteration #14: MSE= 1.702522285527881
Iteration #15: MSE= 1.6490031212266356
Iteration #16: MSE= 1.476551953269413
Iteration #17: MSE= 1.4487934342939142
Iteration #18: MSE= 1.299273631101706
Iteration #19: MSE= 1.2921686217653456
Iteration #20: MSE= 1.184990178055593
Iteration #21: MSE= 1.2016201858482565 so we use the result of the previous iteration.
Code
fig = go.Figure(data=[    
    go.Scatter(name='Ipd', x=np.arange(150), y=results_stoch0[5]),
    go.Scatter(name='Ipd=lowpass(Ipd)', x=np.arange(150), y=lowpass(results_stoch0[5])),
    go.Scatter(name='Ipd=deconv(Rpd)', x=np.arange(150), y=Inext),
    go.Scatter(name='Rpd', x=np.arange(150), y=results_stoch0[6]),
    go.Scatter(name='Rpd=conv(Inext)', x=np.arange(150), y=signal.fftconvolve(h_I, Inext, mode='full')),
    go.Scatter(name='Rpd=lowpass(Rpd)', x=np.arange(150), y=lowpass(results_stoch0[6])),
    go.Scatter(name='Rpd=conv(deconv(Rpd))', x=np.arange(150), y=signal.fftconvolve(h_I, Inext, mode='full')[:len(Rpd)])    
])

fig.update_layout(
    
    xaxis_title = 'Day',
    yaxis_title = 'Count',
    title={
        'text':r'$\text{Actual } I_{pd} \text{ vs. deconvolution of } R_{pd}$',
        'x':0.5,
        'xanchor':'center'
    }
)

fig.show()

Again our algorithm works nicely.

Discussion

There are a lot of papers and research done in the field of deconvolution in general but I have found only one using it in epidemic models (link above).

The theory is skipped here but it is required if we want to use more advanced techniques. I may write another blog post with more details.

After doing this you might be left wondering, well what’s the point ?

A few things:

  • If we have daily data of infectious \(I_{pd}\) we can then use the deconvolution technique to find an approximation for \(E_{pd}\)
  • If we have a close estimate for \(E_{pd}\) we can in turn estimate the value of \(\beta\) or \(R\)
  • We can verify the robusteness of \(h_L\) and \(h_I\) against real world data and how these vary geographically in a pandemic

These points will be studied a bit more in the next blog post.