Source code for tacoma.epidemics

# -*- coding: utf-8 -*-
"""
This module provides functions related to the simulation and
measurement of epidemics.
"""

import numpy as np
import tacoma as tc

from _tacoma import gillespie_QS_SIS_on_EdgeActivityModel, gillespie_QS_SIS_on_edge_lists

from scipy.sparse import csr_matrix
from scipy.sparse import eye
from scipy.linalg import expm
from scipy.sparse.linalg import eigs
from scipy.optimize import minimize


[docs]def simulate_and_measure_i_inf(temporal_network_or_model,epidemic_object,t_equilibrate,is_static=False,verbose=False): """Get the equilibrium ratio of infected. Parameters ---------- temporal_network_or_model : :class:`_tacoma.edge_changes`, :class:`_tacoma.edge_lists`, :class:`_tacoma.edge_changes_with_histograms`, :class:`_tacoma.edge_lists_with_histograms`, or :class:`_tacoma.EdgeActivityModel`. An instance of a temporal network or network model. epidemic_object : :class:`_tacoma.SI`, :class:`_tacoma.SIS`, :class:`_tacoma.SIR`, :class:`_tacoma.SIRS`, :class:`_tacoma.node_based_SIS` An initialized epidemic object. t_equilibrate: float Time passed after t0 after which to start measuring. is_static : bool, default : False The algorithm works a bit differently if it knows that the network is actually static. It works only with instances of :class:`_tacoma.edge_lists`. verbose: bool, optional Be chatty. Returns ------- i_inf: float Temporal average over the ratio of infected after equilibration. i_inf_std: float RMSE of the ratio of infected. R0: float As measured after equilibration """ tn = temporal_network_or_model eo = epidemic_object N = tn.N t_eq = t_equilibrate t_run = eo.t_simulation t_run_total = t_eq + t_run tc.gillespie_epidemics(tn,eo,is_static=is_static,verbose=verbose) t = np.array(eo.time) I = np.array(eo.I,dtype=float) / N r0 = np.array(eo.R0) t0 = t[0] if t[-1]>t0+t_eq: ndcs = np.where(t>=t_eq+t0)[0] ti, i = t[ndcs], I[ndcs] i_inf = tc.time_average(ti,i,tmax=t0+t_run_total) i_inf_std = np.sqrt(tc.time_average(ti,(i-i_inf)**2,tmax=t0+t_run_total)) r0 = r0[t>=t_eq+t0] this_t = t[t>=t_eq+t0] R0 = tc.time_average(this_t,r0,tmax=t0+t_run_total) else: i_inf = 0 i_inf_std = 0. R0 = r0[-1] result = ( i_inf, i_inf_std, R0 ) return result
[docs]def simulate_quasi_stationary_SIS_on_model(model, qs_sis, verbose=False): while True: gillespie_QS_SIS_on_EdgeActivityModel(model, qs_sis, verbose=verbose) if qs_sis.ended_in_absorbing_state(): node_status, G = qs_sis.get_random_configuration() qs_sis.set_node_configuration(qs_sis.last_active_time, node_status) model.set_initial_configuration(qs_sis.last_active_time, G) else: break return qs_sis.get_infection_observables()
[docs]def simulate_quasi_stationary_SIS_on_static_network(network, qs_sis, verbose=False): t = float(qs_sis.last_active_time) while True: gillespie_QS_SIS_on_edge_lists(network, qs_sis, is_static=True, verbose=verbose) if qs_sis.ended_in_absorbing_state(): delta_t = qs_sis.last_active_time - t node_status, G = qs_sis.get_random_configuration() qs_sis.set_initial_configuration(qs_sis.last_active_time, node_status) qs_sis.t_simulation -= delta_t #print(qs_sis.last_active_time, t, qs_sis.t_simulation) #sys.exit(1) new_t = [qs_sis.last_active_time] network.t = new_t t = qs_sis.last_active_time print("t = ", t,": died") else: break return qs_sis.get_infection_observables()
[docs]def get_SIS_critical_infection_rate(tn, recovery_rate, method='Powell', arpackmaxiter=10000, arpacktol=1e-9): res = minimize(lambda eta: (1-get_SIS_max_eigenvalue(tn, eta[0], recovery_rate, maxiter=arpackmaxiter, tol=arpacktol, ))**2, [1], method=method ) res = res.x return res, get_SIS_max_eigenvalue(tn, res, recovery_rate)
[docs]def get_SIS_critical_recovery_rate(tn, infection_rate, method='Powell', arpackmaxiter=10000, arpacktol=1e-9): res = minimize(lambda rho: (1-get_SIS_max_eigenvalue(tn, infection_rate, rho[0], maxiter=arpackmaxiter, tol=arpacktol, ))**2, [1], method=method ) res = res.x return res, get_SIS_max_eigenvalue(tn, infection_rate, res)
""" def get_SIS_epidemic_threshold(tn, infection_rate=None, recovery_rate=None): if infection_rate is None and recovery_rate is None: raise ValueError('Please provide either an infection rate (to find the critical recovery rate) (x)or a recovery rate (to find the critical infection rate).') elif infection_rate is not None and recovery_rate is not None: raise ValueError('Please provide either an infection rate (to find the critical recovery rate) (x)or a recovery rate (to find the critical infection rate), not both.') """
[docs]def get_SIS_max_eigenvalue(tn, infection_rate, recovery_rate, maxiter=10000, tol=1e-9): if type(tn) != tc.sparse_adjacency_matrices: raise ValueError('Please provide an instance of tacoma.sparse_adjacency_matrices') I = eye(tn.N, format=tn.adjacency_matrices[0].format) this_matrix = I.copy() rho = recovery_rate eta = infection_rate i = 0 for A in tn.adjacency_matrices: t0 = tn.t[i] if i+1 < len(tn.t): t1 = tn.t[i+1] else: t1 = tn.tmax dt = t1 - t0 this_matrix = expm((eta*A - rho*I)*dt).dot(this_matrix) T = this_matrix.tocsc() mus, _ = eigs(T, k=2, which='LR', maxiter=maxiter, tol=tol) mu_max = max(np.real(mus)) return mu_max
_EPI_S = 0 _EPI_I = 1 _EPI_R = 2
[docs]class SIR_weighted: infected = [] force_of_infection_on_node = [] node_status = [] rates = np.array([0.0,0.0]) def __init__(self,N,weighted_edge_tuples,infection_rate,recovery_rate,number_of_initially_infected): self.N = N self.infection_rate = infection_rate self.recovery_rate = recovery_rate self.infected = np.random.choice(N,replace=False,size=(number_of_initially_infected,)).tolist() self.recovered = [] self.node_status = np.array([ _EPI_S for n in range(N) ],dtype=int) for n in self.infected: self.node_status[n] = _EPI_I self.force_of_infection_on_node = np.array([ 0.0 for n in range(N) ]) self.graph = [ {} for n in range(N) ] for u, v, weight in weighted_edge_tuples: self.graph[u][v] = weight self.graph[v][u] = weight if self.node_status[u] == _EPI_S and self.node_status[v] == _EPI_I: self.force_of_infection_on_node[u] += weight elif self.node_status[v] == _EPI_S and self.node_status[u] == _EPI_I: self.force_of_infection_on_node[v] += weight
[docs] def evaluate_rates(self): self.rates[0] = self.force_of_infection_on_node.sum() * self.infection_rate self.rates[1] = len(self.infected) * self.recovery_rate return self.rates
[docs] def recovery_event(self): recovering_node = self.infected.pop( np.random.choice(len(self.infected)) ) self.node_status[recovering_node] = _EPI_R self.force_of_infection_on_node[recovering_node] = 0.0 for neigh, weight in self.graph[recovering_node].items(): if self.node_status[neigh] == _EPI_S: self.force_of_infection_on_node[neigh] -= weight if self.force_of_infection_on_node[neigh] < 0.0: self.force_of_infection_on_node[neigh] = 0.0 self.recovered.append(recovering_node)
[docs] def infection_event(self): infecting_node = np.random.choice(self.N, p=self.force_of_infection_on_node/self.force_of_infection_on_node.sum() ) self.node_status[infecting_node] = _EPI_I self.force_of_infection_on_node[infecting_node] = 0.0 for neigh, weight in self.graph[infecting_node].items(): if self.node_status[neigh] == _EPI_S: self.force_of_infection_on_node[neigh] += weight self.infected.append(infecting_node)
[docs] def simulation(self,tmax): t = 0.0 time = [] I = [] R = [] while (t < tmax) and len(self.infected) > 0: time.append(t) I.append(len(self.infected)) R.append(len(self.recovered)) r = self.evaluate_rates() Lambda = r.sum() tau = np.random.exponential(1.0/Lambda) event = np.random.choice(len(r),p=r/Lambda) if event == 0: self.infection_event() else: self.recovery_event() t += tau if len(self.infected) == 0 and (t < tmax): time.append(t) I.append(0) R.append(len(self.recovered)) return np.array(time), np.array(I, dtype=float), np.array(R, dtype=float)
[docs]class SIS_weighted: infected = [] force_of_infection_on_node = [] node_status = [] rates = np.array([0.0,0.0]) def __init__(self,N,weighted_edge_tuples,infection_rate,recovery_rate,number_of_initially_infected): self.N = N self.infection_rate = infection_rate self.recovery_rate = recovery_rate self.infected = np.random.choice(N,replace=False,size=(number_of_initially_infected,)).tolist() self.node_status = np.array([ _EPI_S for n in range(N) ],dtype=int) for n in self.infected: self.node_status[n] = _EPI_I self.force_of_infection_on_node = np.array([ 0.0 for n in range(N) ]) self.graph = [ {} for n in range(N) ] for u, v, weight in weighted_edge_tuples: self.graph[u][v] = weight self.graph[v][u] = weight if self.node_status[u] == _EPI_S and self.node_status[v] == _EPI_I: self.force_of_infection_on_node[u] += weight elif self.node_status[v] == _EPI_S and self.node_status[u] == _EPI_I: self.force_of_infection_on_node[v] += weight
[docs] def evaluate_rates(self): self.rates[0] = self.force_of_infection_on_node.sum() * self.infection_rate self.rates[1] = len(self.infected) * self.recovery_rate return self.rates
[docs] def recovery_event(self): recovering_node = self.infected.pop( np.random.choice(len(self.infected)) ) self.node_status[recovering_node] = _EPI_S self.force_of_infection_on_node[recovering_node] = 0.0 for neigh, weight in self.graph[recovering_node].items(): if self.node_status[neigh] == _EPI_S: self.force_of_infection_on_node[neigh] -= weight if self.force_of_infection_on_node[neigh] < 0.0: self.force_of_infection_on_node[neigh] = 0.0 elif self.node_status[neigh] == _EPI_I: self.force_of_infection_on_node[recovering_node] += weight
[docs] def infection_event(self): infecting_node = np.random.choice(self.N, p=self.force_of_infection_on_node/self.force_of_infection_on_node.sum() ) self.node_status[infecting_node] = _EPI_I self.force_of_infection_on_node[infecting_node] = 0.0 for neigh, weight in self.graph[infecting_node].items(): if self.node_status[neigh] == _EPI_S: self.force_of_infection_on_node[neigh] += weight self.infected.append(infecting_node)
[docs] def simulation(self,tmax): t = 0.0 time = [] I = [] while (t < tmax) and len(self.infected) > 0: time.append(t) I.append(len(self.infected)) r = self.evaluate_rates() R = r.sum() tau = np.random.exponential(1.0/R) event = np.random.choice(len(r),p=r/R) if event == 0: self.infection_event() else: self.recovery_event() t += tau if len(self.infected) == 0 and (t < tmax): time.append(t) I.append(0) return np.array(time), np.array(I, dtype=float)
if __name__ == "__main__": N = 100 k = 10 omega = 1.6 recovery_rate = 0.1 R0 = 10 t_run_total = 1000 AM = tc.EdgeActivityModel(N, k/(N-1.), omega, t0 = 2000 ) infection_rate = R0 / k * recovery_rate SIS = tc.SIS(N,t_run_total,infection_rate,recovery_rate, number_of_initially_infected=N, sampling_dt=0.0, ) print(simulate_and_measure_i_inf(AM, SIS,t_equilibrate=900))