From bff5ddc0c450580c19d4b86cb38e0cbf964abcaa Mon Sep 17 00:00:00 2001 From: Particular Miner <78448465+ParticularMiner@users.noreply.github.com> Date: Sat, 23 Oct 2021 09:44:41 +0200 Subject: [PATCH 1/8] added TfidfTransformer and TfidfVectorizer to feature_extraction.text just the skeleton (no tests yet) --- .gitignore | 2 + dask_ml/feature_extraction/text.py | 413 ++++++++++++++++++++++++++++- 2 files changed, 413 insertions(+), 2 deletions(-) diff --git a/.gitignore b/.gitignore index 4c9132fad..b7707ff9a 100644 --- a/.gitignore +++ b/.gitignore @@ -122,3 +122,5 @@ docs/source/auto_examples/ docs/source/examples/mydask.png dask-worker-space +/.project +/.pydevproject diff --git a/dask_ml/feature_extraction/text.py b/dask_ml/feature_extraction/text.py index a647ddee7..731a2ab95 100644 --- a/dask_ml/feature_extraction/text.py +++ b/dask_ml/feature_extraction/text.py @@ -12,9 +12,13 @@ import scipy.sparse import sklearn.base import sklearn.feature_extraction.text +import sklearn.preprocessing from dask.delayed import Delayed from distributed import get_client, wait from sklearn.utils.validation import check_is_fitted +from builtins import getattr + +FLOAT_DTYPES = (np.float64, np.float32, np.float16) class _BaseHasher(sklearn.base.BaseEstimator): @@ -116,6 +120,35 @@ def _hasher(self): return sklearn.feature_extraction.text.FeatureHasher +def _n_samples(X): + """Count the number of samples sparse X.""" + def chunk_n_samples(chunk, axis, keepdims): + return np.array([chunk.shape[0]]) + + return da.reduction(X, + chunk=chunk_n_samples, + aggregate=np.sum, + concatenate=False, + dtype=X.dtype).compute() + + +def _document_frequency(X, dtype): + """Count the number of non-zero values for each feature in sparse X.""" + def chunk_doc_freq(chunk, axis, keepdims): + if scipy.sparse.isspmatrix_csr(chunk): + arr = np.bincount(chunk.indices) + return np.pad(arr, (0, chunk.shape[1] - len(arr))) + else: + return np.diff(chunk.indptr) + + return da.reduction(X, + chunk=chunk_doc_freq, + aggregate=np.sum, + axis=0, + concatenate=False, + dtype=dtype).compute().astype(dtype) + + class CountVectorizer(sklearn.feature_extraction.text.CountVectorizer): """Convert a collection of text documents to a matrix of token counts @@ -167,7 +200,11 @@ class CountVectorizer(sklearn.feature_extraction.text.CountVectorizer): """ def fit_transform(self, raw_documents, y=None): - params = self.get_params() + subclass_instance_params = self.get_params() + excluded_keys = getattr(self, '_non_CountVectorizer_params', []) + params = {key: subclass_instance_params[key] + for key in subclass_instance_params + if key not in excluded_keys} vocabulary = params.pop("vocabulary") vocabulary_for_transform = vocabulary @@ -201,7 +238,11 @@ def fit_transform(self, raw_documents, y=None): return result def transform(self, raw_documents): - params = self.get_params() + subclass_instance_params = self.get_params() + excluded_keys = getattr(self, '_non_CountVectorizer_params', []) + params = {key: subclass_instance_params[key] + for key in subclass_instance_params + if key not in excluded_keys} vocabulary = params.pop("vocabulary") if vocabulary is None: @@ -229,6 +270,370 @@ def transform(self, raw_documents): return build_array(transformed, n_features, meta) +class TfidfTransformer(sklearn.feature_extraction.text.TfidfTransformer): + """Transform a count matrix to a normalized tf or tf-idf representation + + See Also + -------- + sklearn.feature_extraction.text.TfidfTransformer + + Examples + -------- + >>> from dask_ml.feature_extraction.text import TfidfTransformer + >>> from dask_ml.feature_extraction.text import CountVectorizer + >>> from sklearn.pipeline import Pipeline + >>> import numpy as np + >>> corpus = ['this is the first document', + ... 'this document is the second document', + ... 'and this is the third one', + ... 'is this the first document'] + >>> X = CountVectorizer().fit_transform(corpus) + dask.array + >>> X.compute().toarray() + array([[0, 1, 1, 1, 0, 0, 1, 0, 1], + [0, 2, 0, 1, 0, 1, 1, 0, 1], + [1, 0, 0, 1, 1, 0, 1, 1, 1], + [0, 1, 1, 1, 0, 0, 1, 0, 1]]) + >>> transformer = TfidfTransformer().fit(X) + TfidfTransformer() + >>> transformer.idf_ + array([1.91629073, 1.22314355, 1.51082562, 1. , 1.91629073, + 1.91629073, 1. , 1.91629073, 1. ]) + >>> transformer.transform(X).compute().shape + (4, 9) + """ + def fit(self, X, y=None): + """Learn the idf vector (global term weights). + + Parameters + ---------- + X : sparse matrix of shape n_samples, n_features) + A matrix of term/token counts. + """ + # X = check_array(X, accept_sparse=('csr', 'csc')) + # if not sp.issparse(X): + # X = sp.csr_matrix(X) + dtype = X.dtype if X.dtype in FLOAT_DTYPES else np.float64 + + if self.use_idf: + n_samples, n_features = _n_samples(X), X.shape[1] + df = _document_frequency(X, dtype) + # df = df.astype(dtype, **_astype_copy_false(df)) + + # perform idf smoothing if required + df += int(self.smooth_idf) + n_samples += int(self.smooth_idf) + + # log+1 instead of log makes sure terms with zero idf don't get + # suppressed entirely. + idf = np.log(n_samples / df) + 1 + self._idf_diag = scipy.sparse.diags( + idf, + offsets=0, + shape=(n_features, n_features), + format="csr", + dtype=dtype, + ) + + return self + + def transform(self, X, copy=True): + """Transform a count matrix to a tf or tf-idf representation + + Parameters + ---------- + X : sparse matrix of (n_samples, n_features) + a matrix of term/token counts + + copy : bool, default=True + Whether to copy X and operate on the copy or perform in-place + operations. + + Returns + ------- + vectors : sparse matrix of shape (n_samples, n_features) + """ + # X = self._validate_data( + # X, accept_sparse="csr", dtype=FLOAT_DTYPES, copy=copy, reset=False + # ) + # if not sp.issparse(X): + # X = sp.csr_matrix(X, dtype=np.float64) + + def _astype(chunk): + c = chunk.copy() + c.data = chunk.data.astype(np.float64) + return c + + def _one_plus_log(chunk): + c = chunk.copy() + c.data = np.log(chunk.data, dtype=np.float64) + c.data += 1 + return c + + def _dot_idf_diag(chunk): + return chunk * self._idf_diag + + if X.dtype != np.float64: + X = X.map_blocks(_astype, dtype=np.float64) + + if self.sublinear_tf: + X = X.map_blocks(_one_plus_log, dtype=np.float64) + + if self.use_idf: + # idf_ being a property, the automatic attributes detection + # does not work as usual and we need to specify the attribute + # name: + check_is_fitted(self, attributes=["idf_"], msg="idf vector is not fitted") + + # *= doesn't work + X = X.map_blocks(_dot_idf_diag, dtype=np.float64) + + if self.norm: + X = X.map_blocks(_normalize_transform, + dtype=np.float64, + norm=self.norm) + + return X + + +class TfidfVectorizer(CountVectorizer): + r"""Convert a collection of raw documents to a matrix of TF-IDF features. + + Equivalent to :class:`CountVectorizer` followed by + :class:`TfidfTransformer`. + + See Also + -------- + sklearn.feature_extraction.text.TfidfVectorizer + + Examples + -------- + The Dask-ML implementation currently requires that ``raw_documents`` + is a :class:`dask.bag.Bag` of documents (lists of strings). + + >>> from dask_ml.feature_extraction.text import TfidfVectorizer + >>> import dask.bag as db + >>> from distributed import Client + >>> client = Client() + >>> corpus = [ + ... 'This is the first document.', + ... 'This document is the second document.', + ... 'And this is the third one.', + ... 'Is this the first document?', + ... ] + >>> corpus = db.from_sequence(corpus, npartitions=2) + >>> vectorizer = TfidfVectorizer() + >>> X = vectorizer.fit_transform(corpus) + dask.array + >>> X.compute().toarray() + array([[0. , 0.46979139, 0.58028582, 0.38408524, 0. , + 0. , 0.38408524, 0. , 0.38408524], + [0. , 0.6876236 , 0. , 0.28108867, 0. , + 0.53864762, 0.28108867, 0. , 0.28108867], + [0.51184851, 0. , 0. , 0.26710379, 0.51184851, + 0. , 0.26710379, 0.51184851, 0.26710379], + [0. , 0.46979139, 0.58028582, 0.38408524, 0. , + 0. , 0.38408524, 0. , 0.38408524]]) + >>> vectorizer.get_feature_names() + ['and', 'document', 'first', 'is', 'one', 'second', 'the', 'third', 'this'] + """ + + def __init__( + self, + *, + input="content", + encoding="utf-8", + decode_error="strict", + strip_accents=None, + lowercase=True, + preprocessor=None, + tokenizer=None, + analyzer="word", + stop_words=None, + token_pattern=r"(?u)\b\w\w+\b", + ngram_range=(1, 1), + max_df=1.0, + min_df=1, + max_features=None, + vocabulary=None, + binary=False, + dtype=np.float64, + norm="l2", + use_idf=True, + smooth_idf=True, + sublinear_tf=False, + ): + + super().__init__( + input=input, + encoding=encoding, + decode_error=decode_error, + strip_accents=strip_accents, + lowercase=lowercase, + preprocessor=preprocessor, + tokenizer=tokenizer, + analyzer=analyzer, + stop_words=stop_words, + token_pattern=token_pattern, + ngram_range=ngram_range, + max_df=max_df, + min_df=min_df, + max_features=max_features, + vocabulary=vocabulary, + binary=binary, + dtype=dtype, + ) + + self._non_CountVectorizer_params = ['norm', 'use_idf', + 'smooth_idf', 'sublinear_tf'] + self._tfidf = TfidfTransformer( + norm=norm, use_idf=use_idf, smooth_idf=smooth_idf, sublinear_tf=sublinear_tf + ) + + # Broadcast the TF-IDF parameters to the underlying transformer instance + # for easy grid search and repr + + @property + def norm(self): + """Norm of each row output, can be either "l1" or "l2".""" + return self._tfidf.norm + + @norm.setter + def norm(self, value): + self._tfidf.norm = value + + @property + def use_idf(self): + """Whether or not IDF re-weighting is used.""" + return self._tfidf.use_idf + + @use_idf.setter + def use_idf(self, value): + self._tfidf.use_idf = value + + @property + def smooth_idf(self): + """Whether or not IDF weights are smoothed.""" + return self._tfidf.smooth_idf + + @smooth_idf.setter + def smooth_idf(self, value): + self._tfidf.smooth_idf = value + + @property + def sublinear_tf(self): + """Whether or not sublinear TF scaling is applied.""" + return self._tfidf.sublinear_tf + + @sublinear_tf.setter + def sublinear_tf(self, value): + self._tfidf.sublinear_tf = value + + @property + def idf_(self): + """Inverse document frequency vector, only defined if `use_idf=True`. + + Returns + ------- + ndarray of shape (n_features,) + """ + return self._tfidf.idf_ + + @idf_.setter + def idf_(self, value): + self._validate_vocabulary() + if hasattr(self, "vocabulary_"): + if len(self.vocabulary_) != len(value): + raise ValueError( + "idf length = %d must be equal to vocabulary size = %d" + % (len(value), len(self.vocabulary)) + ) + self._tfidf.idf_ = value + + def _check_params(self): + if self.dtype not in FLOAT_DTYPES: + warnings.warn( + "Only {} 'dtype' should be used. {} 'dtype' will " + "be converted to np.float64.".format(FLOAT_DTYPES, self.dtype), + UserWarning, + ) + + def fit(self, raw_documents, y=None): + """Learn vocabulary and idf from training set. + + Parameters + ---------- + raw_documents : iterable + An iterable which generates either str, unicode or file objects. + + y : None + This parameter is not needed to compute tfidf. + + Returns + ------- + self : object + Fitted vectorizer. + """ + self._check_params() + self._warn_for_unused_params() + X = super().fit_transform(raw_documents, + y=self._non_CountVectorizer_params) + self._tfidf.fit(X) + return self + + def fit_transform(self, raw_documents, y=None): + """Learn vocabulary and idf, return document-term matrix. + + This is equivalent to fit followed by transform, but more efficiently + implemented. + + Parameters + ---------- + raw_documents : iterable + An iterable which generates either str, unicode or file objects. + + y : None + This parameter is ignored. + + Returns + ------- + X : sparse matrix of (n_samples, n_features) + Tf-idf-weighted document-term matrix. + """ + self._check_params() + X = super().fit_transform(raw_documents) + self._tfidf.fit(X) + # X is already a transformed view of raw_documents so + # we set copy to False + return self._tfidf.transform(X) + + def transform(self, raw_documents): + """Transform documents to document-term matrix. + + Uses the vocabulary and document frequencies (df) learned by fit (or + fit_transform). + + Parameters + ---------- + raw_documents : iterable + An iterable which generates either str, unicode or file objects. + + Returns + ------- + X : sparse matrix of (n_samples, n_features) + Tf-idf-weighted document-term matrix. + """ + check_is_fitted(self, msg="The TF-IDF vectorizer is not fitted") + + X = super().transform(raw_documents) + return self._tfidf.transform(X, copy=False) + + def _more_tags(self): + return {"X_types": ["string"], "_skip_test": True} + + def build_array(bag, n_features, meta): name = "from-bag-" + bag.name layer = {(name, i, 0): (k, i) for k, i in bag.__dask_keys__()} @@ -257,6 +662,10 @@ def vocabulary_length(vocabulary): raise ValueError(f"Unknown vocabulary type {type(vocabulary)}.") +def _normalize_transform(chunk, norm): + return sklearn.preprocessing.normalize(chunk, norm=norm) + + def _count_vectorizer_transform(partition, vocabulary, params): model = sklearn.feature_extraction.text.CountVectorizer( vocabulary=vocabulary, **params From ebedfa8e98d80011899139925b172c9f4d0add16 Mon Sep 17 00:00:00 2001 From: Particular Miner <78448465+ParticularMiner@users.noreply.github.com> Date: Sat, 23 Oct 2021 17:34:34 +0200 Subject: [PATCH 2/8] exploited minlength parameter in numpy.bincount(); added a few tests --- dask_ml/feature_extraction/text.py | 23 ++++++------ tests/feature_extraction/test_text.py | 50 +++++++++++++++++++++++++++ 2 files changed, 61 insertions(+), 12 deletions(-) diff --git a/dask_ml/feature_extraction/text.py b/dask_ml/feature_extraction/text.py index 731a2ab95..d52b98ba4 100644 --- a/dask_ml/feature_extraction/text.py +++ b/dask_ml/feature_extraction/text.py @@ -121,7 +121,7 @@ def _hasher(self): def _n_samples(X): - """Count the number of samples sparse X.""" + """Count the number of samples dask array X.""" def chunk_n_samples(chunk, axis, keepdims): return np.array([chunk.shape[0]]) @@ -133,11 +133,10 @@ def chunk_n_samples(chunk, axis, keepdims): def _document_frequency(X, dtype): - """Count the number of non-zero values for each feature in sparse X.""" + """Count the number of non-zero values for each feature in dask array X.""" def chunk_doc_freq(chunk, axis, keepdims): if scipy.sparse.isspmatrix_csr(chunk): - arr = np.bincount(chunk.indices) - return np.pad(arr, (0, chunk.shape[1] - len(arr))) + return np.bincount(chunk.indices, minlength=chunk.shape[1]) else: return np.diff(chunk.indptr) @@ -361,11 +360,10 @@ def transform(self, X, copy=True): # X = sp.csr_matrix(X, dtype=np.float64) def _astype(chunk): - c = chunk.copy() - c.data = chunk.data.astype(np.float64) - return c + return chunk.astype(np.float64, copy=True) def _one_plus_log(chunk): + # transforms nonzero elements x of csr_matrix: x -> 1 + log(x) c = chunk.copy() c.data = np.log(chunk.data, dtype=np.float64) c.data += 1 @@ -374,11 +372,12 @@ def _one_plus_log(chunk): def _dot_idf_diag(chunk): return chunk * self._idf_diag + meta = scipy.sparse.eye(0, format="csr") if X.dtype != np.float64: - X = X.map_blocks(_astype, dtype=np.float64) + X = X.map_blocks(_astype, dtype=np.float64, meta=meta) if self.sublinear_tf: - X = X.map_blocks(_one_plus_log, dtype=np.float64) + X = X.map_blocks(_one_plus_log, dtype=np.float64, meta=meta) if self.use_idf: # idf_ being a property, the automatic attributes detection @@ -386,13 +385,13 @@ def _dot_idf_diag(chunk): # name: check_is_fitted(self, attributes=["idf_"], msg="idf vector is not fitted") - # *= doesn't work - X = X.map_blocks(_dot_idf_diag, dtype=np.float64) + X = X.map_blocks(_dot_idf_diag, dtype=np.float64, meta=meta) if self.norm: X = X.map_blocks(_normalize_transform, dtype=np.float64, - norm=self.norm) + norm=self.norm, + meta=meta) return X diff --git a/tests/feature_extraction/test_text.py b/tests/feature_extraction/test_text.py index 01323f106..fb21542b1 100644 --- a/tests/feature_extraction/test_text.py +++ b/tests/feature_extraction/test_text.py @@ -183,3 +183,53 @@ def test_count_vectorizer_remote_vocabulary(): ) m.fit_transform(b) assert m.vocabulary_ is remote_vocabulary + + +@pytest.mark.parametrize("distributed", [True, False]) +@pytest.mark.parametrize("norm", ["l1", "l2"]) +@pytest.mark.parametrize("use_idf", [True, False]) +@pytest.mark.parametrize("smooth_idf", [True, False]) +@pytest.mark.parametrize("sublinear_tf", [True, False]) +def test_tfidf_vectorizer(distributed, + norm, + use_idf, + smooth_idf, + sublinear_tf): + m1 = (sklearn.feature_extraction.text + .TfidfVectorizer(norm=norm, + use_idf=use_idf, + smooth_idf=smooth_idf, + sublinear_tf=sublinear_tf)) + b = db.from_sequence(JUNK_FOOD_DOCS, npartitions=2) + r1 = m1.fit_transform(JUNK_FOOD_DOCS) + + m2 = (dask_ml.feature_extraction.text + .TfidfVectorizer(norm=norm, + use_idf=use_idf, + smooth_idf=smooth_idf, + sublinear_tf=sublinear_tf)) + + if distributed: + client = Client() # noqa + else: + client = dummy_context() + + r2 = m2.fit_transform(b) + + with client: + exclude = {"vocabulary_actor_", "stop_words_"} + if not use_idf: + # idf_ being a property, the automatic attributes detection + # does not work as usual so we will exclude it in this case: + exclude.add("idf_") + assert_estimator_equal(m1, m2, exclude=exclude) + assert isinstance(r2, da.Array) + assert isinstance(r2._meta, scipy.sparse.csr_matrix) + np.testing.assert_array_almost_equal(r1.toarray(), + r2.compute().toarray()) + + r3 = m2.transform(b) + assert isinstance(r3, da.Array) + assert isinstance(r3._meta, scipy.sparse.csr_matrix) + np.testing.assert_array_almost_equal(r1.toarray(), + r3.compute().toarray()) From 1ac8e38b380a99619841024b04fde67f0ad5f962 Mon Sep 17 00:00:00 2001 From: Particular Miner <78448465+ParticularMiner@users.noreply.github.com> Date: Sun, 24 Oct 2021 11:07:02 +0200 Subject: [PATCH 3/8] added dask.dataframe.Series support to CountVectorizer & TfidfVectorizer --- dask_ml/feature_extraction/text.py | 114 ++++++++++++++++++-------- tests/feature_extraction/test_text.py | 11 ++- 2 files changed, 86 insertions(+), 39 deletions(-) diff --git a/dask_ml/feature_extraction/text.py b/dask_ml/feature_extraction/text.py index d52b98ba4..169d28d43 100644 --- a/dask_ml/feature_extraction/text.py +++ b/dask_ml/feature_extraction/text.py @@ -8,6 +8,7 @@ import dask.bag as db import dask.dataframe as dd import distributed +import pandas as pd import numpy as np import scipy.sparse import sklearn.base @@ -16,7 +17,6 @@ from dask.delayed import Delayed from distributed import get_client, wait from sklearn.utils.validation import check_is_fitted -from builtins import getattr FLOAT_DTYPES = (np.float64, np.float32, np.float16) @@ -120,18 +120,6 @@ def _hasher(self): return sklearn.feature_extraction.text.FeatureHasher -def _n_samples(X): - """Count the number of samples dask array X.""" - def chunk_n_samples(chunk, axis, keepdims): - return np.array([chunk.shape[0]]) - - return da.reduction(X, - chunk=chunk_n_samples, - aggregate=np.sum, - concatenate=False, - dtype=X.dtype).compute() - - def _document_frequency(X, dtype): """Count the number of non-zero values for each feature in dask array X.""" def chunk_doc_freq(chunk, axis, keepdims): @@ -172,7 +160,9 @@ class CountVectorizer(sklearn.feature_extraction.text.CountVectorizer): Examples -------- The Dask-ML implementation currently requires that ``raw_documents`` - is a :class:`dask.bag.Bag` of documents (lists of strings). + is either a :class:`dask.bag.Bag` of documents (lists of strings) or + a :class:`dask.dataframe.Series` of documents (Series of strings) + with partitions of type :class:`pandas.Series`. >>> from dask_ml.feature_extraction.text import CountVectorizer >>> import dask.bag as db @@ -184,10 +174,25 @@ class CountVectorizer(sklearn.feature_extraction.text.CountVectorizer): ... 'And this is the third one.', ... 'Is this the first document?', ... ] - >>> corpus = db.from_sequence(corpus, npartitions=2) + >>> corpus_bag = db.from_sequence(corpus, npartitions=2) >>> vectorizer = CountVectorizer() - >>> X = vectorizer.fit_transform(corpus) - dask.array>> X = vectorizer.fit_transform(corpus_bag) + dask.array + >>> X.compute().toarray() + array([[0, 1, 1, 1, 0, 0, 1, 0, 1], + [0, 2, 0, 1, 0, 1, 1, 0, 1], + [1, 0, 0, 1, 1, 0, 1, 1, 1], + [0, 1, 1, 1, 0, 0, 1, 0, 1]]) + >>> vectorizer.get_feature_names() + ['and', 'document', 'first', 'is', 'one', 'second', 'the', 'third', 'this'] + + >>> import dask.dataframe as dd + >>> import pandas as pd + >>> corpus_dds = dd.from_pandas(pd.Series(corpus), npartitions=2) + >>> vectorizer = CountVectorizer() + >>> X = vectorizer.fit_transform(corpus_dds) + dask.array >>> X.compute().toarray() array([[0, 1, 1, 1, 0, 0, 1, 0, 1], @@ -199,13 +204,17 @@ class CountVectorizer(sklearn.feature_extraction.text.CountVectorizer): """ def fit_transform(self, raw_documents, y=None): + # Note that in general 'self' could refer to an instance of either this + # class or a subclass of this class. Hence it is possible that + # self.get_params() could get unexpected parameters of an instance of a + # subclass. Such parameters need to be excluded here: subclass_instance_params = self.get_params() excluded_keys = getattr(self, '_non_CountVectorizer_params', []) params = {key: subclass_instance_params[key] for key in subclass_instance_params if key not in excluded_keys} - vocabulary = params.pop("vocabulary") + vocabulary = params.pop("vocabulary") vocabulary_for_transform = vocabulary if self.vocabulary is not None: @@ -217,19 +226,22 @@ def fit_transform(self, raw_documents, y=None): fixed_vocabulary = False # Case 2: learn vocabulary from the data. vocabularies = raw_documents.map_partitions(_build_vocabulary, params) - vocabulary = vocabulary_for_transform = _merge_vocabulary( - *vocabularies.to_delayed() - ) + vocabulary = vocabulary_for_transform = ( + _merge_vocabulary( *vocabularies.to_delayed() )) vocabulary_for_transform = vocabulary_for_transform.persist() vocabulary_ = vocabulary.compute() n_features = len(vocabulary_) - result = raw_documents.map_partitions( - _count_vectorizer_transform, vocabulary_for_transform, params - ) - meta = scipy.sparse.eye(0, format="csr", dtype=self.dtype) - result = build_array(result, n_features, meta) + if isinstance(raw_documents, dd.Series): + result = raw_documents.map_partitions( + _count_vectorizer_transform, vocabulary_for_transform, + params, meta=meta) + else: + result = raw_documents.map_partitions( + _count_vectorizer_transform, vocabulary_for_transform, params) + result = build_array(result, n_features, meta) + result.compute_chunk_sizes() self.vocabulary_ = vocabulary_ self.fixed_vocabulary_ = fixed_vocabulary @@ -237,6 +249,10 @@ def fit_transform(self, raw_documents, y=None): return result def transform(self, raw_documents): + # Note that in general 'self' could refer to an instance of either this + # class or a subclass of this class. Hence it is possible that + # self.get_params() could get unexpected parameters of an instance of a + # subclass. Such parameters need to be excluded here: subclass_instance_params = self.get_params() excluded_keys = getattr(self, '_non_CountVectorizer_params', []) params = {key: subclass_instance_params[key] @@ -262,12 +278,17 @@ def transform(self, raw_documents): vocabulary_for_transform = vocabulary n_features = vocabulary_length(vocabulary_for_transform) - transformed = raw_documents.map_partitions( - _count_vectorizer_transform, vocabulary_for_transform, params - ) meta = scipy.sparse.eye(0, format="csr", dtype=self.dtype) - return build_array(transformed, n_features, meta) - + if isinstance(raw_documents, dd.Series): + result = raw_documents.map_partitions( + _count_vectorizer_transform, vocabulary_for_transform, + params, meta=meta) + else: + transformed = raw_documents.map_partitions( + _count_vectorizer_transform, vocabulary_for_transform, params) + result = build_array(transformed, n_features, meta) + result.compute_chunk_sizes() + return result class TfidfTransformer(sklearn.feature_extraction.text.TfidfTransformer): """Transform a count matrix to a normalized tf or tf-idf representation @@ -316,7 +337,7 @@ def fit(self, X, y=None): dtype = X.dtype if X.dtype in FLOAT_DTYPES else np.float64 if self.use_idf: - n_samples, n_features = _n_samples(X), X.shape[1] + n_samples, n_features = X.shape df = _document_frequency(X, dtype) # df = df.astype(dtype, **_astype_copy_false(df)) @@ -409,7 +430,9 @@ class TfidfVectorizer(CountVectorizer): Examples -------- The Dask-ML implementation currently requires that ``raw_documents`` - is a :class:`dask.bag.Bag` of documents (lists of strings). + is either a :class:`dask.bag.Bag` of documents (lists of strings) or + a :class:`dask.dataframe.Series` of documents (Series of strings) + with partitions of type :class:`pandas.Series`. >>> from dask_ml.feature_extraction.text import TfidfVectorizer >>> import dask.bag as db @@ -421,10 +444,29 @@ class TfidfVectorizer(CountVectorizer): ... 'And this is the third one.', ... 'Is this the first document?', ... ] - >>> corpus = db.from_sequence(corpus, npartitions=2) + >>> corpus_bag = db.from_sequence(corpus, npartitions=2) + >>> vectorizer = TfidfVectorizer() + >>> X = vectorizer.fit_transform(corpus_bag) + dask.array + >>> X.compute().toarray() + array([[0. , 0.46979139, 0.58028582, 0.38408524, 0. , + 0. , 0.38408524, 0. , 0.38408524], + [0. , 0.6876236 , 0. , 0.28108867, 0. , + 0.53864762, 0.28108867, 0. , 0.28108867], + [0.51184851, 0. , 0. , 0.26710379, 0.51184851, + 0. , 0.26710379, 0.51184851, 0.26710379], + [0. , 0.46979139, 0.58028582, 0.38408524, 0. , + 0. , 0.38408524, 0. , 0.38408524]]) + >>> vectorizer.get_feature_names() + ['and', 'document', 'first', 'is', 'one', 'second', 'the', 'third', 'this'] + + >>> import dask.dataframe as dd + >>> import pandas as pd + >>> corpus_dds = dd.from_pandas(pd.Series(corpus), npartitions=2) >>> vectorizer = TfidfVectorizer() - >>> X = vectorizer.fit_transform(corpus) - dask.array>> X = vectorizer.fit_transform(corpus_dds) + dask.array >>> X.compute().toarray() array([[0. , 0.46979139, 0.58028582, 0.38408524, 0. , diff --git a/tests/feature_extraction/test_text.py b/tests/feature_extraction/test_text.py index fb21542b1..01c50200c 100644 --- a/tests/feature_extraction/test_text.py +++ b/tests/feature_extraction/test_text.py @@ -186,11 +186,13 @@ def test_count_vectorizer_remote_vocabulary(): @pytest.mark.parametrize("distributed", [True, False]) +@pytest.mark.parametrize("collection_type", ["Bag", "Series"]) @pytest.mark.parametrize("norm", ["l1", "l2"]) @pytest.mark.parametrize("use_idf", [True, False]) @pytest.mark.parametrize("smooth_idf", [True, False]) @pytest.mark.parametrize("sublinear_tf", [True, False]) def test_tfidf_vectorizer(distributed, + collection_type, norm, use_idf, smooth_idf, @@ -200,7 +202,10 @@ def test_tfidf_vectorizer(distributed, use_idf=use_idf, smooth_idf=smooth_idf, sublinear_tf=sublinear_tf)) - b = db.from_sequence(JUNK_FOOD_DOCS, npartitions=2) + if collection_type == "Bag": + docs = db.from_sequence(JUNK_FOOD_DOCS, npartitions=2) + elif collection_type == "Series": + docs = dd.from_pandas(pd.Series(JUNK_FOOD_DOCS), npartitions=2) r1 = m1.fit_transform(JUNK_FOOD_DOCS) m2 = (dask_ml.feature_extraction.text @@ -214,7 +219,7 @@ def test_tfidf_vectorizer(distributed, else: client = dummy_context() - r2 = m2.fit_transform(b) + r2 = m2.fit_transform(docs) with client: exclude = {"vocabulary_actor_", "stop_words_"} @@ -228,7 +233,7 @@ def test_tfidf_vectorizer(distributed, np.testing.assert_array_almost_equal(r1.toarray(), r2.compute().toarray()) - r3 = m2.transform(b) + r3 = m2.transform(docs) assert isinstance(r3, da.Array) assert isinstance(r3._meta, scipy.sparse.csr_matrix) np.testing.assert_array_almost_equal(r1.toarray(), From 1fa55cae9f2c60bddd178a881ab3a2ca87a544f9 Mon Sep 17 00:00:00 2001 From: Particular Miner <78448465+ParticularMiner@users.noreply.github.com> Date: Mon, 25 Oct 2021 16:38:14 +0200 Subject: [PATCH 4/8] removed all unnecessary calls to compute() --- .gitignore | 2 - dask_ml/feature_extraction/text.py | 90 ++++++++++++++++-------------- 2 files changed, 47 insertions(+), 45 deletions(-) diff --git a/.gitignore b/.gitignore index b7707ff9a..4c9132fad 100644 --- a/.gitignore +++ b/.gitignore @@ -122,5 +122,3 @@ docs/source/auto_examples/ docs/source/examples/mydask.png dask-worker-space -/.project -/.pydevproject diff --git a/dask_ml/feature_extraction/text.py b/dask_ml/feature_extraction/text.py index 169d28d43..43827db0b 100644 --- a/dask_ml/feature_extraction/text.py +++ b/dask_ml/feature_extraction/text.py @@ -120,6 +120,18 @@ def _hasher(self): return sklearn.feature_extraction.text.FeatureHasher +def _n_samples(X): + """Count the number of samples in dask.array.Array X.""" + def chunk_n_samples(chunk, axis, keepdims): + return np.array([chunk.shape[0]], dtype=np.int64) + + return da.reduction(X, + chunk=chunk_n_samples, + aggregate=np.sum, + concatenate=False, + dtype=np.int64) + + def _document_frequency(X, dtype): """Count the number of non-zero values for each feature in dask array X.""" def chunk_doc_freq(chunk, axis, keepdims): @@ -133,7 +145,7 @@ def chunk_doc_freq(chunk, axis, keepdims): aggregate=np.sum, axis=0, concatenate=False, - dtype=dtype).compute().astype(dtype) + dtype=dtype) class CountVectorizer(sklearn.feature_extraction.text.CountVectorizer): @@ -203,17 +215,19 @@ class CountVectorizer(sklearn.feature_extraction.text.CountVectorizer): ['and', 'document', 'first', 'is', 'one', 'second', 'the', 'third', 'this'] """ - def fit_transform(self, raw_documents, y=None): + def get_params(self): # Note that in general 'self' could refer to an instance of either this # class or a subclass of this class. Hence it is possible that # self.get_params() could get unexpected parameters of an instance of a # subclass. Such parameters need to be excluded here: - subclass_instance_params = self.get_params() + subclass_instance_params = super().get_params() excluded_keys = getattr(self, '_non_CountVectorizer_params', []) - params = {key: subclass_instance_params[key] - for key in subclass_instance_params - if key not in excluded_keys} + return {key: subclass_instance_params[key] + for key in subclass_instance_params + if key not in excluded_keys} + def fit_transform(self, raw_documents, y=None): + params = self.get_params() vocabulary = params.pop("vocabulary") vocabulary_for_transform = vocabulary @@ -227,12 +241,12 @@ def fit_transform(self, raw_documents, y=None): # Case 2: learn vocabulary from the data. vocabularies = raw_documents.map_partitions(_build_vocabulary, params) vocabulary = vocabulary_for_transform = ( - _merge_vocabulary( *vocabularies.to_delayed() )) + _merge_vocabulary(*vocabularies.to_delayed())) vocabulary_for_transform = vocabulary_for_transform.persist() vocabulary_ = vocabulary.compute() n_features = len(vocabulary_) - meta = scipy.sparse.eye(0, format="csr", dtype=self.dtype) + meta = scipy.sparse.csr_matrix((0, n_features), dtype=self.dtype) if isinstance(raw_documents, dd.Series): result = raw_documents.map_partitions( _count_vectorizer_transform, vocabulary_for_transform, @@ -241,7 +255,6 @@ def fit_transform(self, raw_documents, y=None): result = raw_documents.map_partitions( _count_vectorizer_transform, vocabulary_for_transform, params) result = build_array(result, n_features, meta) - result.compute_chunk_sizes() self.vocabulary_ = vocabulary_ self.fixed_vocabulary_ = fixed_vocabulary @@ -249,15 +262,7 @@ def fit_transform(self, raw_documents, y=None): return result def transform(self, raw_documents): - # Note that in general 'self' could refer to an instance of either this - # class or a subclass of this class. Hence it is possible that - # self.get_params() could get unexpected parameters of an instance of a - # subclass. Such parameters need to be excluded here: - subclass_instance_params = self.get_params() - excluded_keys = getattr(self, '_non_CountVectorizer_params', []) - params = {key: subclass_instance_params[key] - for key in subclass_instance_params - if key not in excluded_keys} + params = self.get_params() vocabulary = params.pop("vocabulary") if vocabulary is None: @@ -271,14 +276,13 @@ def transform(self, raw_documents): except ValueError: vocabulary_for_transform = dask.delayed(vocabulary) else: - (vocabulary_for_transform,) = client.scatter( - (vocabulary,), broadcast=True - ) + (vocabulary_for_transform,) = client.scatter((vocabulary,), + broadcast=True) else: vocabulary_for_transform = vocabulary n_features = vocabulary_length(vocabulary_for_transform) - meta = scipy.sparse.eye(0, format="csr", dtype=self.dtype) + meta = scipy.sparse.csr_matrix((0, n_features), dtype=self.dtype) if isinstance(raw_documents, dd.Series): result = raw_documents.map_partitions( _count_vectorizer_transform, vocabulary_for_transform, @@ -287,7 +291,6 @@ def transform(self, raw_documents): transformed = raw_documents.map_partitions( _count_vectorizer_transform, vocabulary_for_transform, params) result = build_array(transformed, n_features, meta) - result.compute_chunk_sizes() return result class TfidfTransformer(sklearn.feature_extraction.text.TfidfTransformer): @@ -331,15 +334,10 @@ def fit(self, X, y=None): X : sparse matrix of shape n_samples, n_features) A matrix of term/token counts. """ - # X = check_array(X, accept_sparse=('csr', 'csc')) - # if not sp.issparse(X): - # X = sp.csr_matrix(X) - dtype = X.dtype if X.dtype in FLOAT_DTYPES else np.float64 - - if self.use_idf: - n_samples, n_features = X.shape + def get_idf_diag(X, dtype): + n_samples = _n_samples(X) # X.shape[0] is not yet known + n_features = X.shape[1] df = _document_frequency(X, dtype) - # df = df.astype(dtype, **_astype_copy_false(df)) # perform idf smoothing if required df += int(self.smooth_idf) @@ -347,14 +345,12 @@ def fit(self, X, y=None): # log+1 instead of log makes sure terms with zero idf don't get # suppressed entirely. - idf = np.log(n_samples / df) + 1 - self._idf_diag = scipy.sparse.diags( - idf, - offsets=0, - shape=(n_features, n_features), - format="csr", - dtype=dtype, - ) + return np.log(n_samples / df) + 1 + + dtype = X.dtype if X.dtype in FLOAT_DTYPES else np.float64 + + if self.use_idf: + self._idf_diag = get_idf_diag(X, dtype) return self @@ -404,8 +400,17 @@ def _dot_idf_diag(chunk): # idf_ being a property, the automatic attributes detection # does not work as usual and we need to specify the attribute # name: - check_is_fitted(self, attributes=["idf_"], msg="idf vector is not fitted") - + check_is_fitted(self, attributes=["idf_"], + msg="idf vector is not fitted") + if dask.is_dask_collection(self._idf_diag): + _idf_diag = self._idf_diag.compute() + n_features = len(_idf_diag) + self._idf_diag = scipy.sparse.diags( + _idf_diag, + offsets=0, + shape=(n_features, n_features), + format="csr", + dtype=_idf_diag.dtype) X = X.map_blocks(_dot_idf_diag, dtype=np.float64, meta=meta) if self.norm: @@ -619,8 +624,7 @@ def fit(self, raw_documents, y=None): """ self._check_params() self._warn_for_unused_params() - X = super().fit_transform(raw_documents, - y=self._non_CountVectorizer_params) + X = super().fit_transform(raw_documents) self._tfidf.fit(X) return self From 39f9f57a9c620bcc8df15eec0a0dc74c480860b3 Mon Sep 17 00:00:00 2001 From: Particular Miner <78448465+ParticularMiner@users.noreply.github.com> Date: Tue, 26 Oct 2021 13:28:34 +0200 Subject: [PATCH 5/8] replaced get_params() in CountVectorizer with get_CountVectorizer_params() --- dask_ml/feature_extraction/text.py | 41 ++++++++---- tests/feature_extraction/test_text.py | 92 +++++++++++++++++++-------- 2 files changed, 93 insertions(+), 40 deletions(-) diff --git a/dask_ml/feature_extraction/text.py b/dask_ml/feature_extraction/text.py index 43827db0b..01f5f9eca 100644 --- a/dask_ml/feature_extraction/text.py +++ b/dask_ml/feature_extraction/text.py @@ -215,19 +215,34 @@ class CountVectorizer(sklearn.feature_extraction.text.CountVectorizer): ['and', 'document', 'first', 'is', 'one', 'second', 'the', 'third', 'this'] """ - def get_params(self): - # Note that in general 'self' could refer to an instance of either this - # class or a subclass of this class. Hence it is possible that - # self.get_params() could get unexpected parameters of an instance of a - # subclass. Such parameters need to be excluded here: - subclass_instance_params = super().get_params() - excluded_keys = getattr(self, '_non_CountVectorizer_params', []) - return {key: subclass_instance_params[key] - for key in subclass_instance_params - if key not in excluded_keys} + def get_CountVectorizer_params(self, deep=True): + """ + Get CountVectorizer parameters (names and values) for this + estimator (self), whether it is an instance of CountVectorizer or an + instance of a subclass of CountVectorizer. + + Parameters + ---------- + deep : bool, default=True + If True, will return the CountVectorizer parameters for this + estimator and contained subobjects that are estimators. + + Returns + ------- + params : dict + Parameter names mapped to their values. + """ + out = dict() + for key in CountVectorizer._get_param_names(): + value = getattr(self, key) + if deep and hasattr(value, "get_params"): + deep_items = value.get_params().items() + out.update((key + "__" + k, val) for k, val in deep_items) + out[key] = value + return out def fit_transform(self, raw_documents, y=None): - params = self.get_params() + params = self.get_CountVectorizer_params() vocabulary = params.pop("vocabulary") vocabulary_for_transform = vocabulary @@ -262,7 +277,7 @@ def fit_transform(self, raw_documents, y=None): return result def transform(self, raw_documents): - params = self.get_params() + params = self.get_CountVectorizer_params() vocabulary = params.pop("vocabulary") if vocabulary is None: @@ -532,8 +547,6 @@ def __init__( dtype=dtype, ) - self._non_CountVectorizer_params = ['norm', 'use_idf', - 'smooth_idf', 'sublinear_tf'] self._tfidf = TfidfTransformer( norm=norm, use_idf=use_idf, smooth_idf=smooth_idf, sublinear_tf=sublinear_tf ) diff --git a/tests/feature_extraction/test_text.py b/tests/feature_extraction/test_text.py index 01c50200c..a4c9207c6 100644 --- a/tests/feature_extraction/test_text.py +++ b/tests/feature_extraction/test_text.py @@ -197,29 +197,49 @@ def test_tfidf_vectorizer(distributed, use_idf, smooth_idf, sublinear_tf): - m1 = (sklearn.feature_extraction.text - .TfidfVectorizer(norm=norm, - use_idf=use_idf, - smooth_idf=smooth_idf, - sublinear_tf=sublinear_tf)) + skl1 = (sklearn.feature_extraction.text + .TfidfVectorizer(norm=norm, + use_idf=use_idf, + smooth_idf=smooth_idf, + sublinear_tf=sublinear_tf)) + skl2 = (sklearn.feature_extraction.text + .TfidfVectorizer(norm=norm, + use_idf=use_idf, + smooth_idf=smooth_idf, + sublinear_tf=sublinear_tf)) + + JUNK_FOOD_DOCS_SUBLIST = JUNK_FOOD_DOCS[:2] if collection_type == "Bag": - docs = db.from_sequence(JUNK_FOOD_DOCS, npartitions=2) + full_docs = db.from_sequence(JUNK_FOOD_DOCS, npartitions=2) + sub_docs = db.from_sequence(JUNK_FOOD_DOCS_SUBLIST, npartitions=2) elif collection_type == "Series": - docs = dd.from_pandas(pd.Series(JUNK_FOOD_DOCS), npartitions=2) - r1 = m1.fit_transform(JUNK_FOOD_DOCS) - - m2 = (dask_ml.feature_extraction.text - .TfidfVectorizer(norm=norm, - use_idf=use_idf, - smooth_idf=smooth_idf, - sublinear_tf=sublinear_tf)) + full_docs = dd.from_pandas(pd.Series(JUNK_FOOD_DOCS), npartitions=2) + sub_docs = dd.from_pandas(pd.Series(JUNK_FOOD_DOCS_SUBLIST), + npartitions=2) + + csr_skl1 = skl1.fit_transform(JUNK_FOOD_DOCS) + skl2 = skl2.fit(JUNK_FOOD_DOCS) + csr_skl2 = skl2.transform(JUNK_FOOD_DOCS) + + dml1 = (dask_ml.feature_extraction.text + .TfidfVectorizer(norm=norm, + use_idf=use_idf, + smooth_idf=smooth_idf, + sublinear_tf=sublinear_tf)) + dml2 = (dask_ml.feature_extraction.text + .TfidfVectorizer(norm=norm, + use_idf=use_idf, + smooth_idf=smooth_idf, + sublinear_tf=sublinear_tf)) if distributed: client = Client() # noqa else: client = dummy_context() - r2 = m2.fit_transform(docs) + csr_dml1 = dml1.fit_transform(full_docs) + dml2 = dml2.fit(full_docs) + csr_dml2 = dml2.transform(full_docs) with client: exclude = {"vocabulary_actor_", "stop_words_"} @@ -227,14 +247,34 @@ def test_tfidf_vectorizer(distributed, # idf_ being a property, the automatic attributes detection # does not work as usual so we will exclude it in this case: exclude.add("idf_") - assert_estimator_equal(m1, m2, exclude=exclude) - assert isinstance(r2, da.Array) - assert isinstance(r2._meta, scipy.sparse.csr_matrix) - np.testing.assert_array_almost_equal(r1.toarray(), - r2.compute().toarray()) - - r3 = m2.transform(docs) - assert isinstance(r3, da.Array) - assert isinstance(r3._meta, scipy.sparse.csr_matrix) - np.testing.assert_array_almost_equal(r1.toarray(), - r3.compute().toarray()) + assert_estimator_equal(skl1, dml1, exclude=exclude) + assert isinstance(csr_dml1, da.Array) + assert isinstance(csr_dml1._meta, scipy.sparse.csr_matrix) + np.testing.assert_array_almost_equal(csr_skl1.toarray(), + csr_dml1.compute().toarray()) + + assert_estimator_equal(skl2, dml2, exclude=exclude) + assert isinstance(csr_dml2, da.Array) + assert isinstance(csr_dml2._meta, scipy.sparse.csr_matrix) + np.testing.assert_array_almost_equal(csr_skl2.toarray(), + csr_dml2.compute().toarray()) + + csr_dml1 = dml1.transform(full_docs) + assert isinstance(csr_dml1, da.Array) + assert isinstance(csr_dml1._meta, scipy.sparse.csr_matrix) + np.testing.assert_array_almost_equal(csr_skl1.toarray(), + csr_dml1.compute().toarray()) + + csr_skl1 = skl1.transform(JUNK_FOOD_DOCS_SUBLIST) + csr_dml1 = dml1.transform(sub_docs) + assert isinstance(csr_dml1, da.Array) + assert isinstance(csr_dml1._meta, scipy.sparse.csr_matrix) + np.testing.assert_array_almost_equal(csr_skl1.toarray(), + csr_dml1.compute().toarray()) + + csr_skl1 = skl2.transform(JUNK_FOOD_DOCS_SUBLIST) + csr_dml1 = dml2.transform(sub_docs) + assert isinstance(csr_dml1, da.Array) + assert isinstance(csr_dml1._meta, scipy.sparse.csr_matrix) + np.testing.assert_array_almost_equal(csr_skl1.toarray(), + csr_dml1.compute().toarray()) From c331d9da8447f2a12081f4ae928f5bd86a211c53 Mon Sep 17 00:00:00 2001 From: Particular Miner <78448465+ParticularMiner@users.noreply.github.com> Date: Tue, 26 Oct 2021 14:35:38 +0200 Subject: [PATCH 6/8] fixed dtype of output --- dask_ml/feature_extraction/text.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/dask_ml/feature_extraction/text.py b/dask_ml/feature_extraction/text.py index 01f5f9eca..47faf6cc4 100644 --- a/dask_ml/feature_extraction/text.py +++ b/dask_ml/feature_extraction/text.py @@ -405,11 +405,12 @@ def _dot_idf_diag(chunk): return chunk * self._idf_diag meta = scipy.sparse.eye(0, format="csr") - if X.dtype != np.float64: - X = X.map_blocks(_astype, dtype=np.float64, meta=meta) + dtype = X.dtype if X.dtype in FLOAT_DTYPES else np.float64 + if X.dtype != dtype: + X = X.map_blocks(_astype, dtype=dtype, meta=meta) if self.sublinear_tf: - X = X.map_blocks(_one_plus_log, dtype=np.float64, meta=meta) + X = X.map_blocks(_one_plus_log, dtype=dtype, meta=meta) if self.use_idf: # idf_ being a property, the automatic attributes detection @@ -426,11 +427,11 @@ def _dot_idf_diag(chunk): shape=(n_features, n_features), format="csr", dtype=_idf_diag.dtype) - X = X.map_blocks(_dot_idf_diag, dtype=np.float64, meta=meta) + X = X.map_blocks(_dot_idf_diag, dtype=dtype, meta=meta) if self.norm: X = X.map_blocks(_normalize_transform, - dtype=np.float64, + dtype=dtype, norm=self.norm, meta=meta) From 95e11729f46861d1faf08e67b144c84064a10898 Mon Sep 17 00:00:00 2001 From: Particular Miner <78448465+ParticularMiner@users.noreply.github.com> Date: Wed, 27 Oct 2021 09:07:30 +0200 Subject: [PATCH 7/8] applied multiple inheritance to TfidfVectorizer to shrink source-code --- dask_ml/feature_extraction/text.py | 183 +++++------------------------ 1 file changed, 32 insertions(+), 151 deletions(-) diff --git a/dask_ml/feature_extraction/text.py b/dask_ml/feature_extraction/text.py index 47faf6cc4..345fccdc3 100644 --- a/dask_ml/feature_extraction/text.py +++ b/dask_ml/feature_extraction/text.py @@ -418,15 +418,7 @@ def _dot_idf_diag(chunk): # name: check_is_fitted(self, attributes=["idf_"], msg="idf vector is not fitted") - if dask.is_dask_collection(self._idf_diag): - _idf_diag = self._idf_diag.compute() - n_features = len(_idf_diag) - self._idf_diag = scipy.sparse.diags( - _idf_diag, - offsets=0, - shape=(n_features, n_features), - format="csr", - dtype=_idf_diag.dtype) + self.__compute_idf() X = X.map_blocks(_dot_idf_diag, dtype=dtype, meta=meta) if self.norm: @@ -437,8 +429,34 @@ def _dot_idf_diag(chunk): return X + def __compute_idf(self): + # if _idf_diag is still lazy, then it is computed here + if dask.is_dask_collection(self._idf_diag): + _idf_diag = self._idf_diag.compute() + n_features = len(_idf_diag) + self._idf_diag = scipy.sparse.diags( + _idf_diag, + offsets=0, + shape=(n_features, n_features), + format="csr", + dtype=_idf_diag.dtype) + + @property + def idf_(self): + """Inverse document frequency vector, only defined if `use_idf=True`. + + Returns + ------- + ndarray of shape (n_features,) + """ + self.__compute_idf() + # if _idf_diag is not set, this will raise an attribute error, + # which means hasattr(self, "idf_") is False + return np.ravel(self._idf_diag.sum(axis=0)) + -class TfidfVectorizer(CountVectorizer): +class TfidfVectorizer(sklearn.feature_extraction.text.TfidfVectorizer, + CountVectorizer): r"""Convert a collection of raw documents to a matrix of TF-IDF features. Equivalent to :class:`CountVectorizer` followed by @@ -549,149 +567,12 @@ def __init__( ) self._tfidf = TfidfTransformer( - norm=norm, use_idf=use_idf, smooth_idf=smooth_idf, sublinear_tf=sublinear_tf + norm=norm, + use_idf=use_idf, + smooth_idf=smooth_idf, + sublinear_tf=sublinear_tf ) - # Broadcast the TF-IDF parameters to the underlying transformer instance - # for easy grid search and repr - - @property - def norm(self): - """Norm of each row output, can be either "l1" or "l2".""" - return self._tfidf.norm - - @norm.setter - def norm(self, value): - self._tfidf.norm = value - - @property - def use_idf(self): - """Whether or not IDF re-weighting is used.""" - return self._tfidf.use_idf - - @use_idf.setter - def use_idf(self, value): - self._tfidf.use_idf = value - - @property - def smooth_idf(self): - """Whether or not IDF weights are smoothed.""" - return self._tfidf.smooth_idf - - @smooth_idf.setter - def smooth_idf(self, value): - self._tfidf.smooth_idf = value - - @property - def sublinear_tf(self): - """Whether or not sublinear TF scaling is applied.""" - return self._tfidf.sublinear_tf - - @sublinear_tf.setter - def sublinear_tf(self, value): - self._tfidf.sublinear_tf = value - - @property - def idf_(self): - """Inverse document frequency vector, only defined if `use_idf=True`. - - Returns - ------- - ndarray of shape (n_features,) - """ - return self._tfidf.idf_ - - @idf_.setter - def idf_(self, value): - self._validate_vocabulary() - if hasattr(self, "vocabulary_"): - if len(self.vocabulary_) != len(value): - raise ValueError( - "idf length = %d must be equal to vocabulary size = %d" - % (len(value), len(self.vocabulary)) - ) - self._tfidf.idf_ = value - - def _check_params(self): - if self.dtype not in FLOAT_DTYPES: - warnings.warn( - "Only {} 'dtype' should be used. {} 'dtype' will " - "be converted to np.float64.".format(FLOAT_DTYPES, self.dtype), - UserWarning, - ) - - def fit(self, raw_documents, y=None): - """Learn vocabulary and idf from training set. - - Parameters - ---------- - raw_documents : iterable - An iterable which generates either str, unicode or file objects. - - y : None - This parameter is not needed to compute tfidf. - - Returns - ------- - self : object - Fitted vectorizer. - """ - self._check_params() - self._warn_for_unused_params() - X = super().fit_transform(raw_documents) - self._tfidf.fit(X) - return self - - def fit_transform(self, raw_documents, y=None): - """Learn vocabulary and idf, return document-term matrix. - - This is equivalent to fit followed by transform, but more efficiently - implemented. - - Parameters - ---------- - raw_documents : iterable - An iterable which generates either str, unicode or file objects. - - y : None - This parameter is ignored. - - Returns - ------- - X : sparse matrix of (n_samples, n_features) - Tf-idf-weighted document-term matrix. - """ - self._check_params() - X = super().fit_transform(raw_documents) - self._tfidf.fit(X) - # X is already a transformed view of raw_documents so - # we set copy to False - return self._tfidf.transform(X) - - def transform(self, raw_documents): - """Transform documents to document-term matrix. - - Uses the vocabulary and document frequencies (df) learned by fit (or - fit_transform). - - Parameters - ---------- - raw_documents : iterable - An iterable which generates either str, unicode or file objects. - - Returns - ------- - X : sparse matrix of (n_samples, n_features) - Tf-idf-weighted document-term matrix. - """ - check_is_fitted(self, msg="The TF-IDF vectorizer is not fitted") - - X = super().transform(raw_documents) - return self._tfidf.transform(X, copy=False) - - def _more_tags(self): - return {"X_types": ["string"], "_skip_test": True} - def build_array(bag, n_features, meta): name = "from-bag-" + bag.name From 9cf00fc88eae8ca900b565d5f9d998e4fec18541 Mon Sep 17 00:00:00 2001 From: Particular Miner <78448465+ParticularMiner@users.noreply.github.com> Date: Fri, 29 Oct 2021 10:11:01 +0200 Subject: [PATCH 8/8] fixed dtype in TfidfTransformer --- dask_ml/feature_extraction/text.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/dask_ml/feature_extraction/text.py b/dask_ml/feature_extraction/text.py index 345fccdc3..fd1fe02ca 100644 --- a/dask_ml/feature_extraction/text.py +++ b/dask_ml/feature_extraction/text.py @@ -391,23 +391,23 @@ def transform(self, X, copy=True): # if not sp.issparse(X): # X = sp.csr_matrix(X, dtype=np.float64) - def _astype(chunk): - return chunk.astype(np.float64, copy=True) + def _astype(chunk, Xdtype=np.float64): + return chunk.astype(Xdtype, copy=True) def _one_plus_log(chunk): # transforms nonzero elements x of csr_matrix: x -> 1 + log(x) c = chunk.copy() - c.data = np.log(chunk.data, dtype=np.float64) + c.data = np.log(chunk.data, dtype=chunk.data.dtype) c.data += 1 return c def _dot_idf_diag(chunk): return chunk * self._idf_diag - meta = scipy.sparse.eye(0, format="csr") dtype = X.dtype if X.dtype in FLOAT_DTYPES else np.float64 + meta = scipy.sparse.eye(0, format="csr", dtype=dtype) if X.dtype != dtype: - X = X.map_blocks(_astype, dtype=dtype, meta=meta) + X = X.map_blocks(_astype, Xdtype=dtype, dtype=dtype, meta=meta) if self.sublinear_tf: X = X.map_blocks(_one_plus_log, dtype=dtype, meta=meta)