Skip to content

Dynamical Factor Models (DFM) Implementation (GSOC 2025) #446

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 5 commits into
base: main
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,296 changes: 1,296 additions & 0 deletions notebooks/Making a Custom DFM.ipynb

Large diffs are not rendered by default.

574 changes: 574 additions & 0 deletions pymc_extras/statespace/models/DFM.py

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions pymc_extras/statespace/utils/constants.py
Original file line number Diff line number Diff line change
@@ -12,6 +12,8 @@
SEASONAL_AR_PARAM_DIM = "seasonal_lag_ar"
SEASONAL_MA_PARAM_DIM = "seasonal_lag_ma"
ETS_SEASONAL_DIM = "seasonal_lag"
FACTOR_DIM = "factor"
ERROR_AR_PARAM_DIM = "error_ar_lag"

NEVER_TIME_VARYING = ["initial_state", "initial_state_cov", "a0", "P0"]
VECTOR_VALUED = ["initial_state", "state_intercept", "obs_intercept", "a0", "c", "d"]
File renamed without changes.
File renamed without changes.
File renamed without changes.
227 changes: 227 additions & 0 deletions tests/statespace/test_DFM.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,227 @@
import numpy as np
import pandas as pd
import pymc as pm
import pytensor
import pytensor.tensor as pt
import pytest
import statsmodels.api as sm

from numpy.testing import assert_allclose
from statsmodels.tsa.statespace.dynamic_factor import DynamicFactor

from pymc_extras.statespace.models.DFM import BayesianDynamicFactor

floatX = pytensor.config.floatX


@pytest.fixture(scope="session")
def data():
df = pd.read_csv(
"tests/statespace/test_data/statsmodels_macrodata_processed.csv",
index_col=0,
parse_dates=True,
).astype(floatX)
df.index.freq = df.index.inferred_freq
return df


@pytest.mark.parametrize("k_factors", [1, 2])
@pytest.mark.parametrize("factor_order", [0, 1, 2, 3])
@pytest.mark.parametrize("error_order", [0, 1, 2, 3])
def test_dfm_matrices_match_statsmodels(data, k_factors, factor_order, error_order):
y = data
print(data.shape)
n_obs, k_endog = y.shape
print(n_obs, k_endog)

with pm.Model():
dfm = BayesianDynamicFactor(
k_factors=k_factors,
factor_order=factor_order,
k_endog=k_endog,
error_order=error_order,
error_var=False,
measurement_error=True,
verbose=True,
)
factor_part = max(1, factor_order) * k_factors
error_part = error_order * k_endog if error_order > 0 else 0
k_states = factor_part + error_part

pm.Normal("x0", mu=0.0, sigma=1.0, shape=(k_states,))
pm.HalfNormal("P0", sigma=1.0, shape=(k_states, k_states))

pm.Normal("factor_loadings", mu=0.0, sigma=1.0, shape=(k_endog, k_factors))
pm.Normal("factor_ar", mu=0.0, sigma=1.0, shape=(k_factors, factor_order))
pm.HalfNormal("factor_sigma", sigma=1.0, shape=(k_factors,))

pm.Normal("error_ar", mu=0.0, sigma=1.0, shape=(k_endog, error_order))
pm.HalfNormal("error_sigma", sigma=1.0, shape=(k_endog,))

pm.HalfNormal("sigma_obs", sigma=1.0, shape=(k_endog,))

dfm.build_statespace_graph(data=data)
ssm_pymc = dfm.unpack_statespace()

# Build statsmodels DynamicFactor model
sm_dfm = DynamicFactor(
endog=y,
k_factors=k_factors,
factor_order=factor_order,
error_order=error_order,
)

x0 = np.zeros(k_states)
P0 = np.eye(k_states)
sm_dfm.initialize_known(initial_state=x0, initial_state_cov=P0)
# print(sm_dfm.ssm.valid_names)

# Extract relevant matrices from statsmodels
# sm_matrices = [sm_dfm.ssm[m] for m in ["initial_state", "initial_state_cov", "state_intercept", "obs_intercept", "transition", "design", "selection","obs_cov", "state_cov"]]
sm_matrices = sm_dfm.ssm

# Matrices arestore in same order in PyMC model and statsmodels model

# Compile pymc tensors to numpy arrays with dummy inputs
pymc_inputs = list(pytensor.graph.basic.explicit_graph_inputs(ssm_pymc))
f_ssm = pytensor.function(pymc_inputs, ssm_pymc)

# Provide dummy inputs: zeros or identities as appropriate
test_vals = {}
for v in pymc_inputs:
shape = v.type.shape
if len(shape) == 0:
test_vals[v.name] = 0.5
else:
# For covariance matrices, use identity, else fill with 0.5
if "cov" in v.name or "state_cov" in v.name or "obs_cov" in v.name:
test_vals[v.name] = np.eye(shape[0], shape[1], dtype=pytensor.config.floatX)
else:
test_vals[v.name] = np.full(shape, 0.5, dtype=pytensor.config.floatX)

pymc_matrices = f_ssm(**test_vals)

matrix_names = [
"initial_state",
"initial_state_cov",
"state_intercept",
"obs_intercept",
"transition",
"design",
"selection",
"obs_cov",
"state_cov",
]
for pm_mat, sm_mat, name in zip(pymc_matrices, sm_matrices, matrix_names):
assert_allclose(pm_mat, sm_mat, atol=1e-5, err_msg=f"Mismatch in matrix {name}")


# @pytest.mark.parametrize("k_factors", [1])
# @pytest.mark.parametrize("factor_order", [2])
# @pytest.mark.parametrize("error_order", [1])
# def test_loglike_matches_statsmodels(data, k_factors, factor_order, error_order):
# """
# Tests if the log-likelihood calculated by the PyMC statespace model matches
# the one from statsmodels when given the same parameters.
# """
# k_endog = data.shape[1]
# endog_names = data.columns.tolist() # ['realgdp', 'realcons', 'realinv']

# sm_dfm = DynamicFactor(
# endog=data,
# k_factors=k_factors,
# factor_order=factor_order,
# error_order=error_order,
# )
# print(sm_dfm.param_names)

# test_params = {
# # Loadings are now separate parameters
# "loading.f1.realgdp": 0.9,
# "loading.f1.realcons": -0.8,
# "loading.f1.realinv": 0.7,
# # Factor AR coefficients
# "L1.f1.f1": 0.5,
# "L2.f1.f1": 0.2,
# # Error AR coefficients use parenthesis
# "L1.e(realgdp).e(realgdp)": 0.4,
# "L1.e(realcons).e(realcons)": 0.2,
# "L1.e(realinv).e(realinv)": -0.1,
# # Error variances
# "sigma2.realgdp": 0.5,
# "sigma2.realcons": 0.4,
# "sigma2.realinv": 0.3,
# # NOTE: 'sigma2.f1' NOT included, as it's fixed to 1 by default.
# }

# assert set(test_params.keys()) == set(sm_dfm.param_names)
# params_vector = np.array([test_params[name] for name in sm_dfm.param_names])

# # Calculate the log-likelihood in statsmodels
# sm_loglike = sm_dfm.loglike(params_vector)

# # Build the PyMC model and compile its logp function
# with pm.Model() as model:
# factor_part = max(1, factor_order) * k_factors
# error_part = error_order * k_endog if error_order > 0 else 0
# k_states = factor_part + error_part

# pm.Normal("x0", mu=0.0, sigma=1.0, shape=(k_states,))
# pm.Deterministic("P0", pt.eye(k_states, k_states))

# pm.Normal("factor_loadings", mu=0.0, sigma=1.0, shape=(k_endog, k_factors))
# pm.Normal("factor_ar", mu=0.0, sigma=1.0, shape=(k_factors, factor_order))
# pm.Deterministic("factor_sigma", pt.ones(k_factors))

# pm.Normal("error_ar", mu=0.0, sigma=1.0, shape=(k_endog, error_order))
# pm.HalfNormal("error_sigma", sigma=1.0, shape=(k_endog,))

# dfm = BayesianDynamicFactor(
# k_factors=k_factors, factor_order=factor_order, k_endog=k_endog, error_order=error_order
# )

# dfm.build_statespace_graph(data, "data_ssm")
# logp_fn = model.compile_logp()

# # Evaluate the PyMC log-likelihood using the same parameters
# pymc_param_values = {
# "x0": np.zeros(k_states),
# #'P0_log__': np.log(np.eye(k_states)),
# "factor_loadings": np.array(
# [
# test_params["loading.f1.realgdp"],
# test_params["loading.f1.realcons"],
# test_params["loading.f1.realinv"],
# ]
# ).reshape(k_endog, k_factors),
# "factor_ar": np.array([test_params["L1.f1.f1"], test_params["L2.f1.f1"]]).reshape(
# k_factors, factor_order
# ),
# #'factor_sigma_log__': np.log(np.array([1.0])),
# "error_ar": np.array(
# [
# test_params["L1.e(realgdp).e(realgdp)"],
# test_params["L1.e(realcons).e(realcons)"],
# test_params["L1.e(realinv).e(realinv)"],
# ]
# ).reshape(k_endog, error_order),
# "error_sigma_log__": np.log(
# np.sqrt(
# np.array(
# [ # Log-transformed
# test_params["sigma2.realgdp"],
# test_params["sigma2.realcons"],
# test_params["sigma2.realinv"],
# ]
# )
# )
# ),
# }
# pymc_loglike = logp_fn(pymc_param_values)

# assert_allclose(
# pymc_loglike,
# sm_loglike,
# rtol=1e-5,
# err_msg="PyMC log-likelihood does not match statsmodels for manually specified parameters",
# )
Original file line number Diff line number Diff line change
@@ -9,16 +9,16 @@

from pymc_extras.statespace.models.ETS import BayesianETS
from pymc_extras.statespace.utils.constants import LONG_MATRIX_NAMES
from tests.statespace.shared_fixtures import rng
from tests.statespace.test_utilities import load_nile_test_data
from tests.statespace.utilities.shared_fixtures import rng
from tests.statespace.utilities.test_helpers import load_nile_test_data


@pytest.fixture(scope="session")
def data():
return load_nile_test_data()


def test_invalid_order_raises():
def tests_invalid_order_raises():
# Order must be length 3
with pytest.raises(ValueError, match="Order must be a tuple of three strings"):
BayesianETS(order=("A", "N"))
@@ -89,12 +89,6 @@ def test_order_flags(order, expected_flags):
assert getattr(mod, key) == value


def test_mode_argument():
# Mode argument should be passed to the parent class
mod = BayesianETS(order=("A", "N", "N"), mode="FAST_RUN")
assert mod.mode == "FAST_RUN"


@pytest.mark.parametrize("order, expected_params", zip(orders, order_params), ids=order_names)
def test_param_info(order: tuple[str, str, str], expected_params):
mod = BayesianETS(order=order, seasonal_periods=4)
Original file line number Diff line number Diff line change
@@ -18,10 +18,10 @@
SARIMAX_STATE_STRUCTURES,
SHORT_NAME_TO_LONG,
)
from tests.statespace.shared_fixtures import ( # pylint: disable=unused-import
from tests.statespace.utilities.shared_fixtures import ( # pylint: disable=unused-import
rng,
)
from tests.statespace.test_utilities import (
from tests.statespace.utilities.test_helpers import (
load_nile_test_data,
make_stationary_params,
simulate_from_numpy_model,
@@ -178,8 +178,8 @@ def pymc_mod(arima_mod):
# x0 = pm.Normal('x0', dims=['state'])
# P0_diag = pm.Gamma('P0_diag', alpha=2, beta=1, dims=['state'])
# P0 = pm.Deterministic('P0', pt.diag(P0_diag), dims=['state', 'state_aux'])
ar_params = pm.Normal("ar_params", sigma=0.1, dims=["lag_ar"])
ma_params = pm.Normal("ma_params", sigma=1, dims=["lag_ma"])
ar_params = pm.Normal("ar_params", sigma=0.1, dims=["ar_lag"])
ma_params = pm.Normal("ma_params", sigma=1, dims=["ma_lag"])
sigma_state = pm.Exponential("sigma_state", 0.5)
arima_mod.build_statespace_graph(data=data, save_kalman_filter_outputs_in_idata=True)

@@ -207,8 +207,8 @@ def pymc_mod_interp(arima_mod_interp):
P0 = pm.Deterministic(
"P0", pt.eye(arima_mod_interp.k_states) * P0_sigma, dims=["state", "state_aux"]
)
ar_params = pm.Normal("ar_params", sigma=0.1, dims=["lag_ar"])
ma_params = pm.Normal("ma_params", sigma=1, dims=["lag_ma"])
ar_params = pm.Normal("ar_params", sigma=0.1, dims=["ar_lag"])
ma_params = pm.Normal("ma_params", sigma=1, dims=["ma_lag"])
sigma_state = pm.Exponential("sigma_state", 0.5)
sigma_obs = pm.Exponential("sigma_obs", 0.1)

@@ -217,12 +217,6 @@ def pymc_mod_interp(arima_mod_interp):
return pymc_mod


def test_mode_argument():
# Mode argument should be passed to the parent class
mod = BayesianSARIMA(order=(0, 0, 3), mode="FAST_RUN", verbose=False)
assert mod.mode == "FAST_RUN"


@pytest.mark.parametrize(
"p,d,q,P,D,Q,S,expected_names",
[(*order, name) for order, name in zip(test_orders, test_state_names)],
@@ -258,9 +252,6 @@ def test_make_SARIMA_transition_matrix(p, d, q, P, D, Q, S):
"ignore:Non-stationary starting autoregressive parameters found",
"ignore:Non-invertible starting seasonal moving average",
"ignore:Non-stationary starting seasonal autoregressive",
"ignore:divide by zero encountered in matmul:RuntimeWarning",
"ignore:overflow encountered in matmul:RuntimeWarning",
"ignore:invalid value encountered in matmul:RuntimeWarning",
)
def test_SARIMAX_update_matches_statsmodels(p, d, q, P, D, Q, S, data, rng):
sm_sarimax = sm.tsa.SARIMAX(data, order=(p, d, q), seasonal_order=(P, D, Q, S))
@@ -344,8 +335,8 @@ def test_interpretable_states_are_interpretable(arima_mod_interp, pymc_mod_inter
prior = pm.sample_prior_predictive(draws=10)

prior_outputs = arima_mod_interp.sample_unconditional_prior(prior)
ar_lags = prior.prior.coords["lag_ar"].values - 1
ma_lags = prior.prior.coords["lag_ma"].values - 1
ar_lags = prior.prior.coords["ar_lag"].values - 1
ma_lags = prior.prior.coords["ma_lag"].values - 1

# Check the first p states are lags of the previous state
for t, tm1 in zip(ar_lags[1:], ar_lags[:-1]):
@@ -370,9 +361,6 @@ def test_interpretable_states_are_interpretable(arima_mod_interp, pymc_mod_inter
"ignore:Non-invertible starting MA parameters found.",
"ignore:Non-stationary starting autoregressive parameters found",
"ignore:Maximum Likelihood optimization failed to converge.",
"ignore:divide by zero encountered in matmul:RuntimeWarning",
"ignore:overflow encountered in matmul:RuntimeWarning",
"ignore:invalid value encountered in matmul:RuntimeWarning",
)
def test_representations_are_equivalent(p, d, q, P, D, Q, S, data, rng):
if (d + D) > 0:
Original file line number Diff line number Diff line change
@@ -12,7 +12,7 @@

from pymc_extras.statespace import BayesianVARMAX
from pymc_extras.statespace.utils.constants import SHORT_NAME_TO_LONG
from tests.statespace.shared_fixtures import ( # pylint: disable=unused-import
from tests.statespace.utilities.shared_fixtures import ( # pylint: disable=unused-import
rng,
)

@@ -26,7 +26,7 @@
@pytest.fixture(scope="session")
def data():
df = pd.read_csv(
"tests/statespace/_data/statsmodels_macrodata_processed.csv",
"tests/statespace/test_data/statsmodels_macrodata_processed.csv",
index_col=0,
parse_dates=True,
).astype(floatX)
@@ -57,7 +57,7 @@ def pymc_mod(varma_mod, data):
"state_chol", n=varma_mod.k_posdef, eta=1, sd_dist=pm.Exponential.dist(1)
)
ar_params = pm.Normal(
"ar_params", mu=0, sigma=0.1, dims=["observed_state", "lag_ar", "observed_state_aux"]
"ar_params", mu=0, sigma=0.1, dims=["observed_state", "ar_lag", "observed_state_aux"]
)
state_cov = pm.Deterministic(
"state_cov", state_chol @ state_chol.T, dims=["shock", "shock_aux"]
@@ -77,12 +77,6 @@ def idata(pymc_mod, rng):
return idata


def test_mode_argument():
# Mode argument should be passed to the parent class
mod = BayesianVARMAX(k_endog=2, order=(3, 0), mode="FAST_RUN", verbose=False)
assert mod.mode == "FAST_RUN"


@pytest.mark.parametrize("order", orders, ids=ids)
@pytest.mark.parametrize("var", ["AR", "MA", "state_cov"])
@pytest.mark.filterwarnings("ignore::statsmodels.tools.sm_exceptions.EstimationWarning")