Skip to content

Commit 3b93328

Browse files
authored
feat: add clusterization E step
1 parent fecb2e1 commit 3b93328

File tree

10 files changed

+809
-14
lines changed

10 files changed

+809
-14
lines changed

examples/readme_example/example_ml.py

Lines changed: 471 additions & 0 deletions
Large diffs are not rendered by default.
706 KB
Loading
757 KB
Loading
628 KB
Loading
153 KB
Loading
153 KB
Loading
161 KB
Loading

mpest/em/methods/likelihood_method.py

Lines changed: 187 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -3,19 +3,19 @@
33
from functools import partial
44

55
import numpy as np
6+
from scipy.stats import FitError, norm, weibull_min
67

78
from mpest.core.distribution import Distribution
89
from mpest.core.mixture_distribution import MixtureDistribution
910
from mpest.core.problem import Problem, Result
1011
from mpest.em.methods.abstract_steps import AExpectation, AMaximization
11-
from mpest.exceptions import EStepError
12-
from mpest.models import AModel, AModelDifferentiable
12+
from mpest.exceptions import EStepError, SampleError
13+
from mpest.models import AModel, AModelDifferentiable, WeibullModelExp
1314
from mpest.optimizers import AOptimizerJacobian, TOptimizer
1415
from mpest.utils import ResultWithError
1516

1617
EResult = tuple[Problem, np.ndarray] | ResultWithError[MixtureDistribution]
1718

18-
1919
class BayesEStep(AExpectation[EResult]):
2020
"""
2121
Class which represents Bayesian method for calculating matrix for M step in likelihood method
@@ -69,17 +69,190 @@ def step(self, problem: Problem) -> EResult:
6969
return new_problem, h
7070

7171

72-
# class ML(AExpectation[EResult]):
73-
# """
74-
# Class which represents ML method for calculating matrix for M step in likelihood method
75-
# """
76-
#
77-
# def step(self, problem: Problem) -> EResult:
78-
# """
79-
# A function that performs E step
80-
#
81-
# :param problem: Object of class Problem, which contains samples and mixture.
82-
# """
72+
class ClusteringEStep(AExpectation[EResult]):
73+
"""
74+
E step that uses clustering methods for recalculate mixture parameters
75+
Supported methods: DBScan (dbscan), Agglomerative (agglo), KMeans (kmeans)
76+
Use accurate_init for the best accuracy of the
77+
parameter values for each individual component (recommended for mixtures from several different distributions)
78+
"""
79+
MIN_SAMPLES = 2
80+
MIN_PROB = 1e-100
81+
MIN_COMPONENT_SIZE = 10
82+
def __init__(self,models: list[AModel], clusterizer,eps: float=0.3,accurate_init: bool=False) -> None:
83+
self._n_components = len(models)
84+
self._models = models
85+
self._initialized = False
86+
self._current_mixture = MixtureDistribution([])
87+
self._eps = eps
88+
self._accurate_init_flag = accurate_init
89+
self._clusterizer = clusterizer
90+
91+
92+
@staticmethod
93+
def _estimate_weibull_params(data: np.ndarray) -> list[float]:
94+
"""Robust Weibull parameter estimation using MLE"""
95+
try:
96+
params = weibull_min.fit(data, floc=0)
97+
return [float(params[0]), float(params[2])]
98+
except (ValueError, TypeError, FitError):
99+
return [0.5, float(np.mean(data))]
100+
101+
def _find_best_cluster_for_model(self, clusters: dict[int, np.ndarray],
102+
model: AModel) -> tuple[int | None, list[float] | None, float]:
103+
best_k, best_params, best_score = None, None, -np.inf
104+
for k, X_k in clusters.items():
105+
if len(X_k) < self.MIN_SAMPLES:
106+
continue
107+
try:
108+
if isinstance(model, WeibullModelExp):
109+
params = self._estimate_weibull_params(X_k)
110+
params_arr = np.clip(params, [0.1, 0.1], [2.0, 1000.0])
111+
params = [float(params_arr[0]), float(params_arr[1])]
112+
score = np.sum(np.log(weibull_min.pdf(X_k, *params)))
113+
else:
114+
mean = np.mean(X_k)
115+
std = np.clip(np.std(X_k), 0.1, 100.0)
116+
params = [mean, std]
117+
score = np.sum(np.log(norm.pdf(X_k, mean, std)))
118+
if score > best_score:
119+
best_score = score
120+
best_k = k
121+
best_params = params
122+
except ValueError:
123+
continue
124+
return best_k, best_params, best_score
125+
126+
def _accurate_init(self, X: np.ndarray, labels: np.ndarray) -> tuple[list[tuple[AModel, list[float]]], list[float]]:
127+
clusters = {k: X[labels == k] for k in range(self._n_components)}
128+
distributions: list[tuple[AModel, list[float]]] = []
129+
weights: list[float] = []
130+
for model in self._models:
131+
best_k, best_params, best_score = self._find_best_cluster_for_model(clusters, model)
132+
133+
if best_k is None or best_params is None:
134+
X_k = np.random.choice(X, size=10, replace=True)
135+
weight = 1.0 / self._n_components
136+
if isinstance(model, WeibullModelExp):
137+
best_params = self._estimate_weibull_params(X_k)
138+
else:
139+
best_params = [np.mean(X_k), np.std(X_k)]
140+
else:
141+
weight = len(clusters[best_k]) / len(X)
142+
clusters.pop(best_k)
143+
144+
distributions.append(
145+
(model, best_params)
146+
)
147+
weights.append(float(weight))
148+
149+
return distributions, weights
150+
151+
def _fast_init(self, X: np.ndarray, labels: np.ndarray) -> tuple[list[tuple[AModel, list[float]]], list[float]]:
152+
distributions: list[tuple[AModel, list[float]]] = []
153+
weights: list[float] = []
154+
for k in range(self._n_components):
155+
X_k = X[labels == k]
156+
weight = len(X_k) / len(X)
157+
158+
if len(X_k) == 0:
159+
X_k = np.random.choice(X, size=self.MIN_COMPONENT_SIZE, replace=True)
160+
weight = 1.0 / self._n_components
161+
162+
model = self._models[k]
163+
if isinstance(model, WeibullModelExp):
164+
params = self._estimate_weibull_params(X_k)
165+
params = list(np.clip(params, [0.1, 0.1], [2.0, 1000.0]))
166+
params[0], params[1] = float(params[0]), float(params[1])
167+
else:
168+
mean = np.mean(X_k)
169+
std = np.clip(np.std(X_k), 0.1, 100.0)
170+
params = [mean, std]
171+
172+
distributions.append((model, params))
173+
weights.append(float(weight))
174+
return distributions, weights
175+
176+
def _initialize_distributions(self, X: np.ndarray, labels: np.ndarray) -> MixtureDistribution:
177+
"""Improved initialization with distribution-aware parameter estimation"""
178+
if self._accurate_init_flag:
179+
distributions, weights = self._accurate_init(X, labels)
180+
else:
181+
distributions, weights = self._fast_init(X, labels)
182+
183+
total_weight = sum(weights)
184+
normalized_weights: list[float | None] | None = [w / total_weight for w in weights]
185+
self._current_mixture = MixtureDistribution.from_distributions(
186+
(
187+
[
188+
Distribution.from_params(dist[0].__class__, dist[1])
189+
for dist in distributions
190+
]
191+
),
192+
normalized_weights
193+
)
194+
self._initialized = True
195+
return self._current_mixture
196+
197+
def _clusterize(self, X: np.ndarray, clusterizer) -> np.ndarray:
198+
if hasattr(clusterizer, 'n_clusters') and self._n_components != clusterizer.n_clusters:
199+
raise EStepError("Count of components and clusters doesn't match.")
200+
X = X.reshape(-1, 1)
201+
labels = clusterizer.fit_predict(X)
202+
if -1 in labels:
203+
labels[labels == -1] = np.random.choice(range(self._n_components), np.sum(labels == -1))
204+
return labels
205+
206+
207+
def step(self, problem: Problem) -> EResult:
208+
"""E-step with improved numerical stability"""
209+
samples = problem.samples
210+
if not self._initialized:
211+
try:
212+
labels = self._clusterize(samples, self._clusterizer)
213+
mixture_dist = self._initialize_distributions(samples, labels)
214+
except EStepError as e:
215+
return ResultWithError(problem.distributions, e)
216+
else:
217+
mixture_dist = problem.distributions
218+
219+
p_xij = []
220+
active_samples = []
221+
222+
for x in samples:
223+
p = np.zeros(len(mixture_dist.distributions))
224+
for i, d in enumerate(mixture_dist.distributions):
225+
try:
226+
pdf_val = d.model.pdf(x, d.params)
227+
p[i] = max(pdf_val, self.MIN_PROB)
228+
except ValueError:
229+
p[i] = self.MIN_PROB
230+
231+
if np.any(p > self.MIN_PROB):
232+
p_xij.append(p)
233+
active_samples.append(x)
234+
235+
if not active_samples:
236+
error = SampleError("None of the elements in the sample is correct for this mixture")
237+
return ResultWithError(mixture_dist, error)
238+
239+
m = len(p_xij)
240+
k = len(mixture_dist.distributions)
241+
h = np.zeros([k, m], dtype=float)
242+
curr_w = np.array([d.prior_probability or (1.0 / k) for d in mixture_dist.distributions])
243+
curr_w /= curr_w.sum()
244+
245+
for i, p in enumerate(p_xij):
246+
wp = curr_w * p
247+
swp = np.sum(wp)
248+
249+
if swp < self.MIN_PROB:
250+
h[:, i] = curr_w / np.sum(curr_w)
251+
else:
252+
h[:, i] = wp / swp
253+
254+
new_problem = Problem(np.array(active_samples), mixture_dist)
255+
return new_problem, h
83256

84257

85258
class LikelihoodMStep(AMaximization[EResult]):

pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ pre-commit = "^4.1.0"
3333
ruff = "^0.9.6"
3434
mypy = "==1.13.0"
3535
sphinx = "^8.2.0"
36+
hypothesis = "^6.98.0"
3637

3738
[tool.poetry.group.experiments.dependencies]
3839
tqdm = "^4.67.1"

tests/test_ml_estep/test_ml.py

Lines changed: 150 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,150 @@
1+
import numpy as np
2+
import pytest
3+
from hypothesis import assume, given
4+
from hypothesis import strategies as st
5+
from sklearn.cluster import KMeans
6+
7+
from mpest import Distribution, MixtureDistribution, Problem
8+
from mpest.em.methods.likelihood_method import ClusteringEStep
9+
from mpest.models import GaussianModel, WeibullModelExp
10+
from mpest.utils import ResultWithError
11+
12+
WEIBULL_PARAMS_COUNT = 2
13+
MIN_COMPONENT_SIZE = 10
14+
15+
16+
def valid_weibull_data():
17+
return st.lists(
18+
st.floats(min_value=0.1, max_value=100, allow_nan=False, allow_infinity=False),
19+
min_size=10, max_size=1000
20+
).map(np.array)
21+
22+
23+
def valid_gaussian_data():
24+
return st.lists(
25+
st.floats(min_value=-100, max_value=100, allow_nan=False, allow_infinity=False),
26+
min_size=10, max_size=1000
27+
).map(np.array)
28+
29+
30+
def mixed_data():
31+
return st.one_of(valid_weibull_data(), valid_gaussian_data())
32+
33+
34+
def mixture_problems():
35+
return st.builds(
36+
lambda samples: Problem(
37+
samples,
38+
MixtureDistribution.from_distributions([
39+
Distribution.from_params(WeibullModelExp, [1.0, 1.0]),
40+
Distribution.from_params(GaussianModel, [0.0, 1.0]),
41+
], [0.5, 0.5])
42+
),
43+
mixed_data()
44+
)
45+
46+
47+
class TestClusteringEStepInitialization:
48+
@given(st.lists(st.sampled_from([WeibullModelExp(), GaussianModel()])),
49+
st.integers(min_value=0, max_value=1000))
50+
def test_initialization(self, models, labels_seed):
51+
assume(len(models) > 0)
52+
np.random.seed(labels_seed)
53+
labels = np.random.randint(0, len(models), size=100)
54+
ml = ClusteringEStep(models, labels)
55+
assert ml._n_components == len(models)
56+
assert len(ml._models) == len(models)
57+
assert not ml._initialized
58+
assert ml._current_mixture.distributions == []
59+
60+
61+
class TestWeibullParamEstimation:
62+
63+
@given(valid_weibull_data())
64+
def test_weibull_param_estimation(self, data):
65+
models = [WeibullModelExp(), GaussianModel()]
66+
ml = ClusteringEStep(models, np.zeros(len(data), dtype=int))
67+
params = ml._estimate_weibull_params(data)
68+
assert len(params) == WEIBULL_PARAMS_COUNT
69+
assert params[0] > 0
70+
assert params[1] > 0
71+
72+
@given(st.lists(st.floats(min_value=0, max_value=0, allow_nan=False), min_size=1))
73+
def test_weibull_param_estimation_with_bad_data(self, data):
74+
models = [WeibullModelExp(), GaussianModel()]
75+
ml = ClusteringEStep(models, np.zeros(len(data), dtype=int))
76+
params = ml._estimate_weibull_params(np.array(data))
77+
assert params[0] > 0
78+
assert isinstance(params[1], float)
79+
80+
81+
class TestDistributionInitialization:
82+
@given(mixed_data(), st.booleans())
83+
def test_initialization(self, data, accurate_init):
84+
assume(len(data) >= MIN_COMPONENT_SIZE)
85+
models = [WeibullModelExp(), GaussianModel()]
86+
labels = np.random.randint(0, len(models), size=len(data))
87+
ml = ClusteringEStep(models, labels, accurate_init=accurate_init)
88+
mixture = ml._initialize_distributions(data, labels)
89+
assert len(mixture.distributions) == len(models)
90+
for dist in mixture.distributions:
91+
assert dist.params is not None
92+
if isinstance(dist.model, WeibullModelExp):
93+
assert dist.params[0] > 0
94+
assert dist.params[1] > 0
95+
else:
96+
assert dist.params[1] > 0
97+
98+
99+
class TestEStep:
100+
@given(mixture_problems())
101+
def test_e_step(self, problem):
102+
models = [WeibullModelExp(), GaussianModel()]
103+
clusterizer = KMeans(n_clusters=len(models))
104+
ml = ClusteringEStep(models, clusterizer)
105+
result = ml.step(problem)
106+
107+
if isinstance(result, ResultWithError):
108+
pytest.fail("Unexpected error in E-step")
109+
else:
110+
new_problem, h = result
111+
assert len(new_problem.samples) <= len(problem.samples)
112+
assert h.shape[0] == len(models)
113+
if len(new_problem.samples) > 0:
114+
assert h.shape[1] == len(new_problem.samples)
115+
for col in h.T:
116+
assert pytest.approx(1.0, abs=1e-6) == sum(col)
117+
118+
@given(st.lists(st.floats(min_value=0, max_value=1, allow_nan=False)),
119+
st.integers(min_value=1, max_value=10))
120+
def test_e_step_with_empty_cluster(self, data, n_components):
121+
data = np.array(data)
122+
assume(len(data) >= n_components)
123+
models = [WeibullModelExp() if i % 2 else GaussianModel() for i in range(n_components)]
124+
clusterizer = KMeans(n_clusters=len(models))
125+
ml = ClusteringEStep(models, clusterizer)
126+
127+
initial_mixture = MixtureDistribution.from_distributions(
128+
[Distribution.from_params(model.__class__, [1.0, 1.0]) for model in models]
129+
)
130+
problem = Problem(data, initial_mixture)
131+
132+
result = ml.step(problem)
133+
if isinstance(result, ResultWithError):
134+
pytest.fail("Unexpected error in E-step")
135+
else:
136+
new_problem, h = result
137+
assert h.shape == (n_components, len(new_problem.samples))
138+
139+
140+
class TestEdgeCases:
141+
@given(mixed_data())
142+
def test_single_component(self, data):
143+
assume(len(data) >= MIN_COMPONENT_SIZE)
144+
models = [WeibullModelExp()]
145+
labels = np.zeros(len(data), dtype=int)
146+
ml = ClusteringEStep(models, labels)
147+
mixture = ml._initialize_distributions(data, labels)
148+
assert len(mixture.distributions) == 1
149+
assert mixture.distributions[0].params[0] > 0
150+
assert mixture.distributions[0].params[1] > 0

0 commit comments

Comments
 (0)