Covid Modeling with PyMC3¶
Attribution¶
This work is based on the work done by the Priesemann Group for inferring the parameters for COVID-19 and performing predictions. An overview of the methods can be found here.
Goal¶
Obtain the data that has the number of COVID-19 cases starting from January for each country.
Select a country of choice to infer the COVID-19 parameters and extract the number of confirmed cases (You will need the total population of the country that you select).
Use the SIR model as a disease model ([Notebook].(https://github.com/sjster/Epidemic/blob/master/Epidemic.ipynb)). This is a set of non-linear differential equations that are used to model disease propagation.
Setup a PyMC3 model to infer the SIR parameters from the number of confirmed cases (S,I, mu, lambda).
a. Select appropriate priors for each variable.
b. Use a Lognormal distribution for I_begin.
c. λ is the fraction of people that are newly infected each day. Use a Lognormal distribution for this.
d. μ is the fraction of people that recover each day. Use a Lognormal distribution.
e. The prior of the error of observed cases can use a Half Cauchy distribution.
Predict cases into the future.
a. Compare the predictions with the real observations and compute the error.
b. Note how the error varies as you increase the number of days chosen for the forecast.
Use appropriate metadata stores for experiment management. I have used the shelve module in Python but experiment with MLflow.
Classes to perform the modeling¶
COVID_data is the class for data ingestion
Pass a country and the population of the country to initialize this class
Set the dates to obtain case information
covid_obj = COVID_data('US', Population=328.2e6) covid_obj.get_dates(data_begin='2/1/20', data_end='9/28/20')
SIR_model and SIR_model_sunode are the two classes that help to model and solve the set of ODEs that is the SIR model for disease modeling. Use the ‘sunode’ model since this is much faster.
sir_model = SIR_model_sunode(covid_obj)
Set the likelihood and prior distribution information in a dictionary
likelihood = {'distribution': 'lognormal', 'sigma': 2} prior = {'lam': 1.0, 'mu': 0.5, 'lambda_std': 1.0, 'mu_std': 0.2 }
Run the model by passing the number of samples, the number of tuning samples along with the likelihood and the prior
fig1 = sir_model.run_SIR_model(n_samples=2000, n_tune=1000, likelihood=likelihood, prior=prior)
Example¶
covid_obj = COVID_data('US', Population=328.2e6)
covid_obj.get_dates(data_begin='2/1/20', data_end='9/28/20')
sir_model = SIR_model_sunode(covid_obj)
likelihood = {'distribution': 'lognormal',
'sigma': 2}
prior = {'lam': 1.0,
'mu': 0.5,
'lambda_std': 1.0,
'mu_std': 0.2 }
fig1 = sir_model.run_SIR_model(n_samples=2000, n_tune=1000, likelihood=likelihood, prior=prior)
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import plotly
import plotly.figure_factory as ff
import scipy.stats
import pymc3 as pm
import arviz as az
import sunode
import sunode.wrappers.as_theano
from pymc3.ode import DifferentialEquation
import theano.tensor as tt
import theano
import datetime
import shelve
from datetime import datetime as dt
import time
# -------- Usage --------#
# covid_obj = COVID_data('US', Population=328.2e6)
# covid_obj.get_dates(data_begin='7/11/20', data_end='7/20/20')
# sir_model = SIR_model(covid_obj)
# likelihood = {'distribution': 'lognormal', 'sigma': 2}
# prior= {'lam': 0.4, 'mu': 1/8, lambda_std', 0.5 'mu_std': 0.5 }
# sir_model.run_SIR_model(n_samples=20, n_tune=10, likelihood=likelihood)
np.random.seed(0)
class COVID_data():
def __init__(self, country='US', Population = 328.2e6):
confirmed_cases_url = 'https://raw.githubusercontent.com/CSSEGISandData/COVID-19/master/csse_covid_19_data/csse_covid_19_time_series/time_series_covid19_confirmed_global.csv'
self.confirmed_cases = pd.read_csv(confirmed_cases_url, sep=',')
deaths_url = 'https://raw.githubusercontent.com/CSSEGISandData/COVID-19/master/csse_covid_19_data/csse_covid_19_time_series/time_series_covid19_deaths_global.csv'
self.deaths = pd.read_csv(deaths_url, sep=',')
path_to_save = ''
# ------------------------- Country for inference -------------------
self.country = country
self.N = Population # Population of the country
# Germany - 83.7e6
# US - 328.2e6
def get_dates(self, data_begin='7/11/20', data_end='7/20/20'):
# ------------------------- Date for inference ----------------------#
self.data_begin = data_begin #Take the data until yesterday
self.data_end = data_end
self.num_days_to_predict = 14
confirmed_cases = self.confirmed_cases
country = self.country
self.cases_country = confirmed_cases.loc[confirmed_cases["Country/Region"] == country]
self.cases_obs = np.array(confirmed_cases.loc[confirmed_cases["Country/Region"] == country, data_begin:data_end])[0]
print("------------ Cases for selected period ----------- ",self.cases_obs)
date_data_end = confirmed_cases.loc[confirmed_cases["Country/Region"] == self.country, data_begin:data_end].columns[-1]
month, day, year = map(int,date_data_end.split('/'))
date_data_end = datetime.date(year+2000, month, day)
date_today = date_data_end + datetime.timedelta(days=1)
print("------------- Cases yesterday ({}): {} and day before yesterday: {} ------------".format(date_data_end.isoformat(), *self.cases_obs[:-3:-1]))
self.num_days = len(self.cases_obs)
day_before_start = dt.strptime(data_end, '%m/%d/%y') + datetime.timedelta(days=-1)
day_before_start_cases = np.array(self.cases_country.loc[:, day_before_start.strftime('%-m/%-d/%-y')])
print("------------ Day before start and cases for that date ------------", day_before_start, day_before_start_cases)
future_days_begin = dt.strptime(data_end, '%m/%d/%y') + datetime.timedelta(days=1)
future_days_end = future_days_begin + datetime.timedelta(days=self.num_days_to_predict)
self.future_days_begin_s = future_days_begin.strftime('%-m/%-d/%-y')
self.future_days_end_s = future_days_end.strftime('%-m/%-d/%-y')
print("------------- Future date begin and end -------------",self.future_days_begin_s, self.future_days_end_s)
self.future_days = np.array(self.cases_country.loc[:, self.future_days_begin_s : self.future_days_end_s])[0]
print("------------- Future days cases ------------", self.future_days)
class SIR_model():
def __init__(self, covid_data) :
# ------------------------- Covid_data object -----------------------#
self.covid_data = covid_data
# ------------------------- Setup SIR model, but has to be called explicitly to run ------------------------#
self.setup_SIR_model()
def SIR_non_normalized(self, y, t, p):
ds = -p[0] * y[0] * y[1] /self.covid_data.N
di = p[0] * y[0] * y[1] / self.covid_data.N - p[1] * y[1]
return [ds, di]
def setup_SIR_model(self):
self.time_range = np.arange(0,len(self.covid_data.cases_obs),1)
self.I0 = self.covid_data.cases_obs[0]
self.S0 = self.covid_data.N - self.I0
# SIR model
self.sir_model_non_normalized = DifferentialEquation(
func=self.SIR_non_normalized,
times=self.time_range[1:],
n_states=2,
n_theta=2,
t0=0)
def run_SIR_model(self, n_samples, n_tune, likelihood, prior):
# ------------------------- Metadata --------------------------------#
now = dt.now()
timenow = now.strftime("%d-%m-%Y_%H:%M:%S")
self.filename = 'sir_' + self.covid_data.data_begin.replace('/','-') + '_' + \
self.covid_data.data_end.replace('/','-') + '_' + timenow
self.likelihood = likelihood
self.n_samples = n_samples
self.n_tune = n_tune
self.likelihood = likelihood
self.prior = prior
# ------------------------ Write out metadata while the model is running -------------------#
metadata_db_filename = 'metadata_db.db'
t = time.time()
with pm.Model() as model4:
sigma = pm.HalfCauchy('sigma', likelihood['sigma'], shape=1)
lam = pm.Lognormal('lambda', prior['lam'], prior['lambda_std'])
mu = pm.Lognormal('mu', prior['mu'], prior['mu_std'])
res = self.sir_model_non_normalized(y0=[self.S0, self.I0], theta=[lam, mu])
if(likelihood['distribution'] == 'lognormal'):
Y = pm.Lognormal('Y', mu=pm.math.log(res.take(0, axis=1)), sigma=sigma, observed=self.covid_data.cases_obs[1:])
else:
Y = pm.StudentT( "Y", nu=likelihood['nu'], # likelihood distribution of the data
mu=res.take(0, axis=1), # likelihood distribution mean, these are the predictions from SIR
sigma=sigma,
observed=cases_obs[1:]
)
trace = pm.sample(self.n_samples, tune=self.n_tune, target_accept=0.9, cores=2)
data = az.from_pymc3(trace=trace)
t1 = time.time() - t
az.plot_posterior(data, round_to=2, credible_interval=0.95)
axes = az.plot_trace(trace)
fig = axes.ravel()[0].figure
fig.savefig(self.filename)
self.metadata_db = shelve.open(metadata_db_filename)
self.metadata_db[self.filename] = {'type': 'sir', 'samples': n_samples,
'tune': n_tune,
'elapsed_time': t1,
'finished': dt.now().strftime("%d-%m-%Y_%H:%M:%S"),
'likelihood': likelihood,
'prior': prior }
self.metadata_db.close()
class SIR_model_sunode():
def __init__(self, covid_data) :
# ------------------------- Covid_data object -----------------------#
self.covid_data = covid_data
# ------------------------- Setup SIR model, but has to be called explicitly to run ------------------------#
self.setup_SIR_model()
def SIR_sunode(self, t, y, p):
return {
'S': -p.lam * y.S * y.I,
'I': p.lam * y.S * y.I - p.mu * y.I,
}
def setup_SIR_model(self):
self.time_range = np.arange(0,len(self.covid_data.cases_obs),1)
self.I0 = self.covid_data.cases_obs[0]
self.S0 = self.covid_data.N - self.I0
self.S_init = self.S0 / self.covid_data.N
self.I_init = self.I0 / self.covid_data.N
self.cases_obs_scaled = self.covid_data.cases_obs / self.covid_data.N
def run_SIR_model(self, n_samples, n_tune, likelihood, prior):
# ------------------------- Metadata --------------------------------#
now = dt.now()
timenow = now.strftime("%d-%m-%Y_%H:%M:%S")
self.filename = 'sir_' + self.covid_data.data_begin.replace('/','-') + '_' + \
self.covid_data.data_end.replace('/','-') + '_' + timenow
self.likelihood = likelihood
self.n_samples = n_samples
self.n_tune = n_tune
self.likelihood = likelihood
self.prior = prior
# ------------------------ Write out metadata while the model is running -------------------#
metadata_db_filename = 'metadata_db.db'
t = time.time()
with pm.Model() as model4:
sigma = pm.HalfCauchy('sigma', self.likelihood['sigma'], shape=1)
lam_mu = np.log(self.prior['lam']) + self.prior['lambda_std']**2
mu_mu = np.log(self.prior['mu']) + self.prior['mu_std']**2
lam = pm.Lognormal('lambda', lam_mu , self.prior['lambda_std']) # 1.5, 1.5
mu = pm.Lognormal('mu', mu_mu, self.prior['mu_std']) # 1.5, 1.5
res, _, problem, solver, _, _ = sunode.wrappers.as_theano.solve_ivp(
y0={
# The initial conditions of the ode. Each variable
# needs to specify a theano or numpy variable and a shape.
# This dict can be nested.
'S': (self.S_init, ()),
'I': (self.I_init, ()),},
params={
# Each parameter of the ode. sunode will only compute derivatives
# with respect to theano variables. The shape needs to be specified
# as well. It it infered automatically for numpy variables.
# This dict can be nested.
'lam': (lam, ()),
'mu': (mu, ()),
'_dummy': (np.array(1.), ())},
# A functions that computes the right-hand-side of the ode using
# sympy variables.
rhs=self.SIR_sunode,
# The time points where we want to access the solution
tvals=self.time_range,
t0=self.time_range[0]
)
if(likelihood['distribution'] == 'lognormal'):
I = pm.Lognormal('I', mu=res['I'], sigma=sigma, observed=self.cases_obs_scaled)
elif(likelihood['distribution'] == 'normal'):
I = pm.Normal('I', mu=res['I'], sigma=sigma, observed=self.cases_obs_scaled)
elif(likelihood['distribution'] == 'students-t'):
I = pm.StudentT( "I", nu=likelihood['nu'], # likelihood distribution of the data
mu=res['I'], # likelihood distribution mean, these are the predictions from SIR
sigma=sigma,
observed=self.cases_obs_scaled
)
theano.printing.Print('S')(res['S'])
print('Problem',problem)
print('Solver',solver)
R = 1 - (res['I'] + res['S'])
#S = 1 - (res['I'][1:])
#theano.printing.Print('R')(R)
R0 = pm.Deterministic('R0',lam/mu)
step = pm.Metropolis()
trace = pm.sample(self.n_samples, tune=self.n_tune, chains=4, cores=4)
data = az.from_pymc3(trace=trace)
t1 = time.time() - t
az.plot_posterior(data, round_to=2, point_estimate='mode', credible_interval=0.95)
axes = az.plot_trace(trace)
fig = axes.ravel()[0].figure
fig.savefig(self.filename)
fig = ff.create_distplot([trace['R0']], bin_size=0.5, group_labels=['x'])
# Add title
fig.update_layout(title_text='Curve and Rug Plot')
fig.update_xaxes(range=[0,7])
self.metadata_db = shelve.open(metadata_db_filename)
self.metadata_db[self.filename] = {'type': 'sir', 'samples': n_samples,
'tune': n_tune,
'elapsed_time': t1,
'finished': dt.now().strftime("%d-%m-%Y_%H:%M:%S"),
'likelihood': likelihood,
'prior': prior }
self.metadata_db.close()
return(fig)
Forecast cases into the future¶
The number of days predicted depends on the variable ‘num_days_to_predict’ that is set. This uses the uncertainty from the MCMC simulation to give us estimates for the confidence intervals
Notes for Theano¶
Create variables by assigning a datatype
from theano import tensor theano.config.compute_test_value = 'warn' I_begin = tensor.dscalar('I_begin') I_begin.tag.test_value = np.log(cases_obs[0]).astype(theano.config.floatX) S_begin = tensor.dscalar('S_begin') S_begin = N_val - I_begin print(S_begin) Out -> Elemwise{sub,no_inplace}.0
Assign a test value since theano performs lazy evaluation and therefore cannot be evaluated uness you use
a. eval() method
I_begin = tensor.dscalar('I_begin') I_begin.eval({I_begin: np.log(cases_obs[0])})
b. test_value to intiialize a variable and run it, values can be assigned from a numpy array by casting it with ‘.astype(THEANO_DATATYPE)’
λ = tensor.dscalar('λ') λ.tag.test_value = np.log(0.4).astype(theano.config.floatX) μ = tensor.dscalar('μ') μ.tag.test_value = np.log(1/8).astype(theano.config.floatX)
Print the variable value using ‘theano.printing.Print’
S = SIR_model(λ= Lambda_in, μ=mu_in) theano.printing.Print('Value of S')(S)