- Author:
- Shelley Fong <sfon036@UoA.auckland.ac.nz>
- Date:
- 2022-05-19 16:11:42+12:00
- Desc:
- removing smooth muscle contraction module
- Permanent Source URI:
- https://models.physiomeproject.org/workspace/860/rawfile/753e301e529312d3092d054f6bceca72ca116f56/parameter_finder/find_BG_parameters_composite.py
# find bond-graph parameters for a system with multiple modules
# require separate folders within this directory containing each module's kinetic_parameters.py file and data files
# write out cellml file in text form
# prints out error between given kinetic parameters, and parameters found back-transforming the bond-graph parameters
import os
import sys
import importlib
import json
import csv
import math
import numpy as np
from scipy.linalg import null_space
import sympy
from sympy import Matrix, S, nsimplify
from fractions import Fraction
import time
def read_IDs(path):
data = []
with open(path,'r') as f:
reader = csv.reader(f)
for row in reader:
data.append(row[0])
f.close()
return data
def load_matrix(stoich_path):
matrix = []
with open(stoich_path,'r') as f:
reader = csv.reader(f,delimiter=',')
for row in reader:
matrix.append([float(r) for r in row])
f.close()
return matrix
# def rational_nullspace(A, max_denom = 10):
# v = null_space(A)
# vFrac = [[Fraction(num).limit_denominator(max_denominator=max_denom) for num in row] for row in v]
# vRat = [] #np.zeros([len(vFrac),len(vFrac[0])])
# if not v.any():
# return []
# for row in vFrac:
# largest_denom = max([res.denominator for res in row])
# vRat.append( [vi.numerator for vi in row] )
# return vRat
def calcT(I_vec,num_rows):
num_cols = len(I_vec)
T = np.zeros([num_rows,num_cols])
for i in range(num_cols):
T[I_vec[i]][i] = 1
return T
if __name__ == "__main__":
tStart = time.time()
# Set directories
current_dir = os.getcwd()
main_dir = os.path.dirname(current_dir)
output_dir = current_dir + '\output'
whole_name = main_dir.split('\\')[-1]
if not os.path.exists(output_dir):
os.mkdir(output_dir)
R = 8.314 # unit J / mol / K
T = 310
F = 96485
ion_names = ['Na','Ca','K']
ion_names = [i+'_o' for i in ion_names] + [i+'_i' for i in ion_names] + [i+'_SR' for i in ion_names]
ion_names += ['H','MgATP','MgADP','Pi']
# define constants and volumes
cNao = 140 # unit mM
cNai = 10 # unit mM
cKo = 5.4
cKi = 145
cCai = 0.00012
cCa_di = cCai #1.7374E-04 # mM
cCa_sr = 5.5545E-01 # mM
N_A = 6.022e23
A_cap = 2*1.1e-5 # assume same membrane folding as LRd #1.534e-4 # Unit cm ^ 2
Cm = 60e-6/3.71 # Unit microF # scaled Kernik value to be 1 pL (for Yang)
V_myo = 1 # pL, for Kernik iPSCell
V_o = 0.12*V_myo/0.88 # Using cleft volume calc from LRd
V_di = V_myo * 0.0539 # diadic volume
V_sr = V_myo * 0.06 # SR volume
V_isr = V_myo + V_sr # Intracellular volume + SR volume
V = dict()
V['V_myo'] = V_myo
V['V_o'] = V_o
V['V_di'] = V_di
V['V_SR'] = V_sr
V['V_ISR'] = V_isr
V['F'] = F
V['R'] = R
V['T'] = T
V['Cm'] = Cm
V['N_A'] = N_A
V['A_cap'] = A_cap
V['cNao'] = cNao
V['cNai'] = cNai
V['cKi'] = cKi
V['cKo'] = cKo
V['c_di'] = cCa_di
V['c_sr'] = cCa_sr
# store channel densities as 1/um2
chDensity = dict()
chDensity['fast_Na'] = 16
## Load stoichiometric matrices and kinetic rate constants
exclude_folders = ['.git', 'exposure','parameter_finder','python','units_and_constants','.idea']
subsystem_names = ['individual']
if True:
subsystem_names = next(os.walk(main_dir))[1]
exclude_folders2 = exclude_folders.copy()
# exclude_folders.append('CMDN_buffer')
subsystem_names = [s for s in subsystem_names if s not in exclude_folders2]
else:
# Pan channels only, with slight new additions
subsystem_names = ['K1'] #
subsystem_names.sort()
num_subsystems = len(subsystem_names)
sys_struct = {c:{} for c in subsystem_names}
rxnIDs = []
Knames = []
znames = []
zvals = []
Kname_modules = dict()
for i_system in range(num_subsystems):
forward_mat_path = 'data\\all_forward_matrix.txt'
sys_name = subsystem_names[i_system]
sys_dir = main_dir + '\\' + sys_name +'\parameter_finder\\'
os.chdir(sys_dir)
reverse_mat_path = 'data\\all_reverse_matrix.txt'
N_f = load_matrix(forward_mat_path)
N_r = load_matrix(reverse_mat_path)
sys_struct[sys_name]['N_f'] = N_f
sys_struct[sys_name]['N_r'] = N_r
# print(subsystem_names[i_system])
dims = dict()
dims['num_rows'] = len(N_f)
dims['num_cols'] = len(N_f[0])
I = np.identity(dims['num_cols'])
M = np.append(np.append(I, np.transpose(N_f),1), np.append(I, np.transpose(N_r),1),0)
sys.path.append(sys_dir)
try:
V['numChannels'] = chDensity[sys_name]*V['A_cap']*1e8 # convert to um2
print('Num channels for ',sys_name,' is', V['numChannels'])
print ('\t\tfmol is', 1e15*V['numChannels']/N_A)
except:
print('No channel density for ',sys_name)
globals()['kp_' + sys_name] = importlib.import_module('kinetic_parameters_' + sys_name)
[k_kinetic, N_cT, K_C, W] = globals()['kp_' + sys_name].kinetic_parameters(M,True,dims, V)
sys_struct[sys_name]['kfkr'] = k_kinetic
sys_struct[sys_name]['Kc'] = K_C
sys_struct[sys_name]['N_cT'] = N_cT
sys_struct[sys_name]['W'] = W[dims['num_cols']:]
rxnID = read_IDs('data\\rxnID.txt')
rxnIDs.extend(rxnID)
sys_struct[sys_name]['rxnID'] = rxnID
Kname = read_IDs('data\\Kname.txt')
Knames.extend(Kname)
sys_struct[sys_name]['Kname'] = Kname
try:
zname = read_IDs('data\\zname.txt')
zval = read_IDs('data\\z_value.txt')
for iz,z in enumerate(zname):
if z not in znames:
znames.append(z)
zvals.append(zval[iz])
except:
pass
Kunique = []
keep_Kunique_IDs = []
Krepeats = []
for ii, ik in enumerate(Knames):
# if ~any(strcmp(Kunique,ik)):
if ik not in Kunique:
Kunique.append(ik)
keep_Kunique_IDs.append(ii)
else:
Krepeats.append(ik)
# id = Kunique.index(ik)
# keep_Kunique_IDs.append(id)
kapparepeats = []
kappaunique = []
for ik in rxnIDs:
if ik not in kappaunique:
kappaunique.append(ik)
else:
kapparepeats.append(ik)
os.chdir(current_dir)
# relations between submodule to whole module
for name in subsystem_names:
ids = [Kunique.index(kid) for kid in sys_struct[name]['Kname']]
sys_struct[name]['I_vec'] = ids
num_rows = len(Kunique) #max(sys_struct[subsystem_names[-1]]['I_vec'])+1
N_f = []
N_r = []
for sys_name in subsystem_names:
print(sys_name)
T = calcT(sys_struct[sys_name]['I_vec'],num_rows)
sys_struct[sys_name]['T'] = T
new_f = np.matmul(T,sys_struct[sys_name]['N_f'])
new_r = np.matmul(T,sys_struct[sys_name]['N_r'])
if not len(N_f):
N_f = new_f
N_r = new_r
else:
N_f = np.append(N_f, new_f,1)
N_r = np.append(N_r, new_r,1)
N_fT = np.transpose(N_f)
N_rT = np.transpose(N_r)
N = N_r - N_f
N_T = N_rT - N_fT
num_cols = len(N[0])
I = np.identity(num_cols)
M = np.append(np.append(I, N_fT,1), np.append(I, N_rT,1),0)
# M_rref = sympy.Matrix(M).rref()
## Set up the vectors for kinetic rate constants
kf = []
kr = []
W = []
for sys_name in subsystem_names:
nrx = int(len(sys_struct[sys_name]['kfkr'])/2)
kf.extend(sys_struct[sys_name]['kfkr'][:nrx])
kr.extend(sys_struct[sys_name]['kfkr'][nrx:])
W.extend(sys_struct[sys_name]['W'])
k_kinetic = kf +kr
# retain only the elements of W corresponding to Kunique
W = [W[ik] for ik in keep_Kunique_IDs]
W = [1]*num_cols + W
# W = list(np.append([1]*len(N[0]), [V_myo]*num_rows)) # THIS IS WRONG
lambda_expo = np.matmul(np.linalg.pinv(M), [math.log(k) for k in k_kinetic])
lambdaW = [math.exp(l) for l in lambda_expo]
lambdak = [lambdaW[i]/W[i] for i in range(len(W))]
kappa = lambdak[:len(N[0])]
K = lambdak[len(N[0]):]
file = open(output_dir + '/all_parameters_out.json', 'w')
data = { "K": K, "kappa": kappa, "k_kinetic": k_kinetic }
json.dump(data, file)
# Checks
if False:
N_rref = sympy.Matrix(N).rref()
R = nsimplify(Matrix(N), rational=True).nullspace() #rational_nullspace(N, max_denom=len(N[0]))
if R:
R = np.transpose(np.array(R).astype(np.float64))[0]
if False:
# Check that there is a detailed balance constraint
Z = nsimplify(Matrix(M), rational=True).nullspace() #rational_nullspace(M, 2)
if Z:
Z = np.transpose(np.array(Z).astype(np.float64))[0]
k_est = np.matmul(M,[math.log(k) for k in lambdaW])
k_est = [math.exp(k) for k in k_est]
diff = [(k_kinetic[i] - k_est[i])/k_kinetic[i] for i in range(len(k_kinetic))]
error = np.sum([abs(d) for d in diff])
K_eq = [kf[i]/kr[i] for i in range(len(kr))]
try:
zero_est = np.matmul(np.transpose(R),K_eq)
zero_est_log = np.matmul(np.transpose(R),[math.log(k) for k in K_eq])
except:
print('undefined R nullspace')
if True:
# list of all K and kappa in directory
all_subsystem_names = next(os.walk(main_dir))[1]
all_exclude_folders = exclude_folders #+ ['K']
all_subsystem_names = [s for s in all_subsystem_names if s not in all_exclude_folders]
rem_rxnIDs = []
rem_Knames = []
all_Kname = []
all_rxnID = []
for sname in all_subsystem_names:
sys_dir = main_dir + '\\' + sname +'\parameter_finder\\'
rxnID = read_IDs(sys_dir + 'data\\rxnID.txt')
Kname = read_IDs(sys_dir + 'data\\Kname.txt')
for k in Kname:
if (k not in Kunique) and (k not in rem_Knames):
rem_Knames.append(k)
for k in rxnID:
if k not in rxnIDs:
rem_rxnIDs.append(k)
# all_Kname.append(Kname)
# all_rxnID.append(rxnID)
# all_Kname = []
# all_kappaunique = []
# for ii, ik in enumerate(all_Kname):
# if ik not in all_Kunique:
# all_Kunique.append(ik)
# for ik in all_rxnID:
# if ik not in all_kappaunique:
# all_kappaunique.append(ik)
# ### print outputs ###
# include print of remaining kappa/K that are not included: make them zero
Kkappa_values = {c:1e-6 for c in Kunique+rxnIDs}
for ik in range(len(kappa)):
print('var kappa_%s: fmol_per_sec {init: %g, pub: out};' %(rxnIDs[ik],kappa[ik]))
for r in rem_rxnIDs:
print('var kappa_%s: fmol_per_sec {init: 0, pub: out};' %(r))
for ik in range(len(Kunique)):
print('var K_%s: per_fmol {init: %g, pub: out};' %(Kunique[ik],K[ik]))
for k in rem_Knames:
print('var K_%s: per_fmol {init: 1e-6, pub: out};' %(k))
print('\n')
for ik in range(len(znames)):
print('var %s: dimensionless {init: %s, pub: out};' % (znames[ik], zvals[ik]))
# print moles of gate particles if included in system
print('\n')
for sys_name in subsystem_names:
# if sys_name not in Pan_channel_names:
gate_count = 0
for ig in sys_struct[sys_name]['Kname']:
if ig not in ion_names:
gate_count += 1
if gate_count > 0 and sys_name in chDensity.keys():
for ig in sys_struct[sys_name]['Kname']:
gate_mol = (1e15*chDensity[sys_name]*V['A_cap']*1e8/N_A)/gate_count
if ig not in ion_names:
print('var q_%s: fmol {init: %g, pub: out};' %(ig, gate_mol))
if False:
for ig in sys_struct[sys_name]['Kname']:
if ig not in ion_names:
print('vars q_%s and q_%s;' %(ig, ig))
print('\n')
print('error = ', error)
# initialise struct for storing modules contributing to a given K
for ik in range(len(Kunique)):
Kname_modules[Kunique[ik]] = []
for sys_name in subsystem_names:
modKname = sys_struct[sys_name]['Kname']
for ik in range(len(modKname)):
Kname_modules[modKname[ik]].append(sys_name)
# write out CellML code
if True:
cellmlfilepath = output_dir + '\\TEMP.cellml.txt'
with open(cellmlfilepath, 'w') as cid:
cid.write('def model %s as\n def import using "units_and_constants/units_BG.cellml" for\n\
unit mM using unit mM;\nunit fmol using unit fmol;\nunit per_fmol using unit per_fmol;\n\
unit J_per_mol using unit J_per_mol;\nunit fmol_per_sec using unit fmol_per_sec;\n\
unit C_per_mol using unit C_per_mol;\n unit J_per_C using unit J_per_C;\n\
unit microm3 using unit microm3;\n unit fF using unit fF;\n\
unit fC using unit fC;\n unit fA using unit fA;\n\
unit per_second using unit per_second;\n unit millivolt using unit millivolt;\n\
unit per_sec using unit per_sec;\n unit J_per_K_per_mol using unit J_per_K_per_mol;\n\
unit fmol_per_L using unit fmol_per_L;\n unit fmol_per_L_per_sec using unit fmol_per_L_per_sec;\n\
unit per_sec_per_fmol_per_L using unit per_sec_per_fmol_per_L;\n unit uM using unit uM;\n\
unit mM_per_sec using unit mM_per_sec;\n unit uM_per_sec using unit uM_per_sec;\n\
unit pL using unit pL;\n unit m_to_u using unit m_to_u;\n enddef;\n' %(whole_name))
cid.write('def import using "units_and_constants/constants_BG.cellml" for\n\
comp constants using comp constants;\nenddef;\n\n')
for module in subsystem_names:
cid.write('def import using "%s/BG_%s.cellml" for\ncomp %s using comp %s;\nenddef;\n' % (
module, module, module, module))
cid.write('\ndef comp BG_parameters as\n')
for ik in range(len(kappa)):
cid.write('var kappa_%s: fmol_per_sec {init: %g, pub: out};\n' % (rxnIDs[ik], kappa[ik]))
for ik in range(len(Kunique)):
cid.write('var K_%s: per_fmol {init: %g, pub: out};\n' % (Kunique[ik], K[ik]))
cid.write('enddef;\n')
cid.write(' def comp environment as\n\
var t: second {pub: out};\n\
var vol_myo: pL {init: %g, pub: out};\n\
var freq: dimensionless {init: 500};\n' %V_myo)
for j in range(len(K)):
cid.write('var q_%s: fmol {init: 1e-888, pub: out};\n' % Kunique[j])
for module in subsystem_names:
modRx = sys_struct[module]['rxnID']
cid.write('\n// %s imports\n' % module)
for j in modRx:
cid.write('var v_%s: fmol_per_sec {pub: in};\n' % (j))
cid.write('\n')
cid.write('\n')
for kun in Kunique:
cid.write('ode(q_%s, time) =' % (kun))
for mod in Kname_modules[kun]:
cid.write(' + v_m%s ' % (mod))
cid.write(';\n')
cid.write('enddef;\n')
cid.write('\n')
for module in subsystem_names:
modKname = sys_struct[module]['Kname']
modRx = sys_struct[module]['rxnID']
cid.write('def map between environment and %s for\n' % module)
cid.write('vars time and time;\n')
for mod in modKname:
cid.write('vars q_%s and q_%s;\n' % (mod, mod))
for mod in modRx:
cid.write('vars v_%s and v_%s;\n' % (mod, mod))
cid.write('vars q_mem and q_mem;\n')
cid.write('vars I_mem and q_mem_m%s;\n' %module)
cid.write('enddef;\n\n')
for module in subsystem_names:
modKname = sys_struct[module]['Kname']
modrxnID = sys_struct[module]['rxnID']
cid.write('def map between BG_parameters and %s for\n' % (module))
for ik in modrxnID:
cid.write('vars kappa_%s and kappa_%s;\n' % (ik, ik))
for mod in modKname:
cid.write('vars K_%s and K_%s;\n' % (mod, mod))
cid.write('enddef;\n')
cid.write('\n')
for module in subsystem_names:
cid.write('def map between constants and %s for\n' % (module))
cid.write('\tvars R and R;\n\tvars T and T;\nenddef;\n')
cid.write('\nenddef;\n')
cid.close()
elapsed = time.time() - tStart
print('Time elapsed: ',elapsed)