import pandas as pdimport numpy as npimport mathfrom scipy import signalimport plotly.graph_objects as goimport plotly.express as pxfrom scipy.stats import exponfrom scipy.stats import gammafrom scipy.stats import weibull_minfrom numpy.random import default_rngrng = default_rng()import plotlyimport plotly.io as piofrom IPython.display import display, HTML## Tomas Mazak's workaroundplotly.offline.init_notebook_mode()display(HTML(#'<script type="text/javascript" async src="https://cdnjs.cloudflare.com/ajax/libs/mathjax/2.7.1/MathJax.js?config=TeX-MML-AM_SVG"></script>''<script type="text/javascript" async src="https://cdnjs.cloudflare.com/ajax/libs/mathjax/2.7.5/MathJax.js?config=TeX-MML-AM_SVG"></script>''<script src="https://cdn.plot.ly/plotly-3.0.1.js" charset="utf-8"></script>'))pio.renderers.default ="plotly_mimetype+notebook_connected"pio.templates.default ="plotly_dark"# Let's build a numerical solutiondef seir_model(init, parms, days): S_0, E_0, I_0, R_0 = init Epd, Ipd, Rpd = [0], [0], [0] S, E, I, R = [S_0], [E_0], [I_0], [R_0] dt=0.1 t = np.linspace(0,days,int(days/dt)) sigma, beta, gam = parmsfor _ in t[1:]: next_S = S[-1] - beta*S[-1]*I[-1]*dt Epd.append(beta*S[-1]*I[-1]*dt) next_E = E[-1] + (beta*S[-1]*I[-1] - sigma*E[-1])*dt Ipd.append(sigma*E[-1]*dt) next_I = I[-1] + (sigma*E[-1] - gam*I[-1])*dt Rpd.append(gam*I[-1]*dt) next_R = R[-1] + (gam*I[-1])*dt S.append(next_S) E.append(next_E) I.append(next_I) R.append(next_R)return np.stack([S, E, I, R, Epd, Ipd, Rpd]).T
Code
# Need this new function for model below:def make_df(p,num_E, num_I, num_R): df = pd.DataFrame(np.full((p,1), 'S').T[0], columns=['State']) df['Day'] =0 tochange=df.loc[rng.choice(p, size=num_E+num_I+num_R, replace=False),'State'].index df.loc[tochange[0:num_E],'State'] ='E' df.loc[tochange[num_E:num_I+num_E],'State'] ='I' df.loc[tochange[num_E+num_I:num_E+num_I+num_R],'State'] ='R'return df
Code
def seir_model_stoch(beta, p, num_E, num_I, num_R, days):# Initialize population dataframe with data given by user df = make_df(p,num_E, num_I, num_R)# This variable is used to track daily value of beta if it varies over time xxbeta=np.array([],dtype=float)# Initialize the arrays to return# Below are numbers of S, E, I, R total S=np.array([],dtype=int) E=np.array([],dtype=int) I=np.array([],dtype=int) R=np.array([],dtype=int)# Below are the daily additions in S, E, I, R Spd=np.array([],dtype=int) Epd=np.array([],dtype=int) Ipd=np.array([],dtype=int) Rpd=np.array([],dtype=int) b=beta# Stochastic model so use random values to decide on progression rand = np.random.random(size=(p,days))# Depending if you want exponential or gamma distribution for T_Latent EtoI = gamma.rvs(1.8,loc=0.9,scale=(5.2-1.8)/0.9,size=p)# Depending if you want exponential, gamma, or Weibull distribution for T_Infectious# Uses distributions found on blog part 3 ItoR = weibull_min.rvs(2.3, loc=2, scale=20.11, size=p)# Iterate over every day the simulation is runfor j inrange(0,days-1):# Record daily beta values xxbeta=np.append(beta, b)# First we get the index of the individuals that will change state today:# Random number tells you which 'S' have been exposed on this day StoE_index = df.loc[(df.State =='S') & (rand[:,j] < b[j]*len(np.where(df.State=='I')[0])/p)].index# For each row, if a person has been a certain number of days in E, they will go to I# This follows EtoI variable which is either exponential or gamma distributed according to above EtoI_index = df.loc[(df.State =='E') & (j-df.Day >= EtoI)].index# Similaraly as above# For each row, if a person has been a certain number of days in I, they will go to R# This follows EtoI variable which is either exponential or gamma distributed according to above ItoR_index = df.loc[(df.State =='I') & (j-df.Day >= ItoR)].index# Use indexes collected above to populate per day values Epd = np.append(Epd,len(StoE_index)) Ipd = np.append(Ipd,len(EtoI_index)) Rpd = np.append(Rpd,len(ItoR_index))# Now we use the indexes collected above randomly to change the actual population dataframe to the new states df.loc[ItoR_index, 'State'] ='R' df.loc[EtoI_index, 'State'] ='I' df.loc[StoE_index, 'State'] ='E' df.loc[ItoR_index, 'Day'] = j df.loc[EtoI_index, 'Day'] = j df.loc[StoE_index, 'Day'] = j df.loc[ItoR_index, 'DayR'] = j df.loc[EtoI_index, 'DayI'] = j df.loc[StoE_index, 'DayE'] = j# Append the S, E, I, and R arrays S=np.append(S,len(np.where(df.State=='S')[0])) E=np.append(E,len(np.where(df.State=='E')[0])) I=np.append(I,len(np.where(df.State=='I')[0])) R=np.append(R,len(np.where(df.State=='R')[0]))# Code below for control measures to reduce beta values# if ((I[-1] > 1000) & (Ipd[-1] > 399)): # b = beta2# elif ((I[-1] > 1000) & (Ipd[-1] < 400)): # b = beta3 Epd[0]+=num_E Ipd[0]+=num_I Rpd[0]+=num_Rreturn S,E,I,R, Epd, Ipd, Rpd, xxbeta, df
Motivation for write-up
This is the 7th part of a multi-part series blog post on modeling in epidemiology.
The goal of this 7th installment is to expand on the notions seen in part 5.
Most notably, the idea is to expand on the notion of convolution and deconvolution and to see how it can be useful to describe an epidemic.
Available data
If you look at the the COVID-19 trackers around the web, or even mine, you can get a sense of what data is available for study.
Generally speaking we have the following:
Total number of positive individuals
Daily number of newly diagnosed individuals
Daily number of recovered individuals
Daily number of deaths
Research has also shown some vague numbers on both \(T_{Latent}\) and \(T_{Infectious}\).
About the data:
We don’t have data for exposure - how can we get it using deconvolution ??
The data for some regions can be very sparse
Convolution
Convolution to get \(I_{pd}\)
We have seen that daily new infectious individuals is given by:
We can see above that daily new data varies a lot from day to day due to the stochasticity of the model. This results in high-frequency decomposition in the frequency domain.
Initial conditions and boundary-value problems
Boundary values refer to the values on day 0 and the last day we have data for. Due to the way data is reported on JHU, we someimes have crayz values here, most commonly a value of 0 for the latest day if the data isn0t reported on time for example. These gaps can create issues for deconvolution.
In the next blog post I go around this by simplz averaging the last 3 days instead of using only the last day.
Solutions to these problems
Iterative deconvoluton
Multiple iterative deconvolution algorithms exist: Lucy_Richardson, Gold, or Van Cittert among others.
LR has already been used to get daily incidence of the Spanish flu based on death records in Philadelphia at the time for example 1.
An adaptation of those above is used here with the basics below (see code for details):
Let’s use the equation for \(I_{pd}\) as an example:
Where \(O[j] = I_{pd}[j] - h_L[j]\circledast E_{pd, n}[j]\) is the error term.
Hence we want to minimze O[j].
Initial guess
We know our \(h_L\) and \(h_I\) are in part simply delay functions, so an initial first guess is to simply use the same signal delayed by the time difference caused by the impulse response.
Low-pass filter
To go around those high-frequency notes in the data, we can siply use a lowpass filter implemented as a butterworth filter below:
The basic effect is that it smoothes the daily values as we have when convolving from the previous state daily values.
Code
def lowpass(x, fc=0.05): fs =1# Sampling frequency t = np.arange(len(x)) #select number of days done in SEIR model signala = x#fc = 0.05 # Cut-off frequency of the filter w = fc / (fs /2) # Normalize the frequency b, a = signal.butter(5, w, 'low')return signal.filtfilt(b, a, signala)
Deconvolution in practice
Coding the iterative deconvolution
Code
# Let's define an iteration function:def iter_deconv(alpha, impulse_response, input_signal, delay, comparator): conv=signal.fftconvolve(impulse_response, input_signal, mode='full') correction=np.roll(comparator-conv[:len(comparator)], delay) input_signal=np.floor(lowpass(alpha*correction+input_signal)) input_signal[input_signal<0]=0return input_signal# Define a function to return MSE between two signals as a measure of goodness of fitdef msecalc(A, B):return ((A - B)**2).mean(axis=0)
Comparing \(E_{pd}\) from the model to \(E_{pd}\) obtained by de-convolving \(I_{pd}\)
Code
#regularization parameteralpha=2# Setup up the resultant Ipd we want to compare our guess withIpd=np.floor(lowpass(results_stoch0[5]))Ipd[Ipd<0]=0# Find delay caused by h_Ldelay=Ipd.argmax()-signal.fftconvolve(Ipd, h_L, mode='full').argmax()# We want initial guess to simply be the result of the convolution delayedinitial_guess=np.roll(Ipd,delay)Enext = initial_guess# AN array to record MSE between result we want and our iterated guessmse=np.array([])mse=np.append(mse, 100000)mse=np.append(mse, msecalc(Ipd, signal.fftconvolve(h_L, Enext, mode='full')[:len(Ipd)]))itercount=0while mse[-1] < mse[-2]: itercount=itercount+1 Enext=iter_deconv(alpha, h_L, Enext, delay, Ipd) mse=np.append(mse, msecalc(Ipd, signal.fftconvolve(h_L, Enext, mode='full')[:len(Ipd)]))print("Iteration #"+str(itercount) +": MSE= "+str(mse[itercount]))print("Iteration #"+str(itercount+1) +": MSE= "+str(mse[-1])+" so we use the result of the previous iteration.")
The iterative deconvolution seems to work nicely and we get close results.
Comparing \(I_{pd}\) from the model to \(I_{pd}\) obtained by de-convolving \(R_{pd}\)
Let’s try the same thing but using \(R_{pd}\) to get \(I_{pd}\):
Code
#regularization parameteralpha=2# Setup up the resultant Ipd we want to compare our guess withRpd=np.floor(lowpass(results_stoch0[6]))Rpd[Rpd<0]=0# Find delay caused by h_Idelay=Rpd.argmax()-signal.fftconvolve(Rpd, h_I, mode='full').argmax()# We want initial guess to simply be the result of the convolution delayedinitial_guess=np.roll(Rpd,delay)Inext = initial_guess# AN array to record MSE between result we want and our iterated guessmse=np.array([])mse=np.append(mse, 100000)mse=np.append(mse, msecalc(Rpd, signal.fftconvolve(h_I, Inext, mode='full')[:len(Rpd)]))itercount=0while mse[-1] < mse[-2]: itercount=itercount+1 Inext=iter_deconv(alpha, h_I, Inext, delay, Rpd) mse=np.append(mse, msecalc(Rpd, signal.fftconvolve(h_I, Inext, mode='full')[:len(Rpd)]))print("Iteration #"+str(itercount) +": MSE= "+str(mse[itercount]))print("Iteration #"+str(itercount+1) +": MSE= "+str(mse[-1])+" so we use the result of the previous iteration.")
There are a lot of papers and research done in the field of deconvolution in general but I have found only one using it in epidemic models (link above).
The theory is skipped here but it is required if we want to use more advanced techniques. I may write another blog post with more details.
After doing this you might be left wondering, well what’s the point ?
A few things:
If we have daily data of infectious \(I_{pd}\) we can then use the deconvolution technique to find an approximation for \(E_{pd}\)
If we have a close estimate for \(E_{pd}\) we can in turn estimate the value of \(\beta\) or \(R\)
We can verify the robusteness of \(h_L\) and \(h_I\) against real world data and how these vary geographically in a pandemic
These points will be studied a bit more in the next blog post.