Source code for ds_utils.strings
"""String manipulation utilities for data science tasks."""
import re
from collections import Counter
from typing import List, Tuple, Optional, Callable, Union
import pandas as pd
from sklearn.feature_extraction.text import CountVectorizer
from sklearn.preprocessing import MultiLabelBinarizer
def _tokenize(text_tags: str) -> List[str]:
tags = text_tags.split(",")
tags = [re.sub(r"[^a-zA-Z0-9_$-]", "", x) for x in tags]
tags = [x.strip() for x in tags]
tags = [x for x in tags if x] # More concise than checking length
return tags
def _normalize_tags(value, tokenizer, lowercase):
"""Normalize tag input to a list of strings.
Handles both string inputs (which need tokenization) and list inputs
(which are already tokenized).
:param value: Either a string to tokenize or a list of tags
:param tokenizer: Tokenizer function to use for string inputs
:param lowercase: Whether to convert to lowercase
:return: List of normalized tag strings
"""
tags = []
if isinstance(value, str):
if value: # non-empty string
tags = tokenizer(value)
elif isinstance(value, list):
tags = value
# Apply lowercase if requested
if lowercase:
tags = [tag.lower() if isinstance(tag, str) else str(tag).lower() for tag in tags]
return tags
[docs]
def append_tags_to_frame(
X_train: pd.DataFrame,
X_test: pd.DataFrame,
field_name: str,
prefix: str = "",
max_features: Optional[int] = 500,
min_df: Union[int, float] = 1,
lowercase: bool = False,
sparse: bool = False,
tokenizer: Optional[Callable[[str], List[str]]] = _tokenize,
) -> Tuple[pd.DataFrame, pd.DataFrame]:
"""Extract tags from a column and append them as binarized features to the dataframe.
This function processes a specified column in the train and test dataframes that contains tags.
It supports columns with either string-based tags (e.g., "tag1,tag2") or list-based tags
(e.g., ["tag1", "tag2"]). The function identifies a vocabulary of tags from the training data,
filters them based on frequency, and then creates new binary columns for each tag.
Supported Input Types for the Tags Column:
- str: Comma-separated tags. The default tokenizer splits by comma, trims whitespace, and removes
non-alphanumeric characters (except "_", "$", "-"). Empty strings are treated as having no tags.
- List[str]: A pre-tokenized list of tags. Empty lists are treated as having no tags.
- NaN/None: Handled as empty.
Tokenization Rules (for string inputs):
- The default tokenizer splits the input string by commas (",").
- Whitespace around tags is automatically trimmed.
- Duplicate tags within the same string (e.g., "tag1,tag1") are treated as a single occurrence for that row.
- Casing is preserved unless `lowercase=True`.
`min_df` Behavior:
- This parameter filters out tags that are not frequent enough in the training data.
- If `int`: The absolute minimum number of rows a tag must appear in to be included.
- If `float` (between 0.0 and 1.0): The minimum fraction of rows a tag must appear in.
- This filtering is applied *before* the final vocabulary is selected and binarized.
Column Naming Logic:
- The `prefix` argument is prepended to each tag to form the new column names.
- Example: With `prefix="tag_"` and a tag "python", the resulting column will be "tag_python".
Column Ordering:
- The generated tag columns are always sorted alphabetically, ensuring a deterministic and stable
order that can be relied upon for feature alignment in downstream modeling.
:param X_train: Pandas DataFrame with the train features.
:param X_test: Pandas DataFrame with the test features.
:param field_name: The name of the column to parse for tags.
:param prefix: A string prefix for the new binarized tag columns.
:param max_features: The maximum number of tags to include, based on frequency. Default is 500.
:param min_df: The minimum document frequency for a tag to be included. Can be an int or a float. Default is 1.
:param lowercase: If True, all tags are converted to lowercase. Default is False.
:param sparse: If True, returns a DataFrame with sparse columns. Default is False.
:param tokenizer: A custom function to tokenize string inputs. Defaults to an internal tokenizer.
:return: A tuple containing the transformed train and test DataFrames.
:raise KeyError: If `field_name` is not in the input dataframes.
"""
if X_train.empty:
return pd.DataFrame(), pd.DataFrame()
x_train_filled = X_train[field_name].fillna("")
# Tokenize the training data (handles both strings and lists)
train_tags = x_train_filled.apply(lambda x: _normalize_tags(x, tokenizer, lowercase))
# Calculate document frequency
doc_freq = Counter(tag for tags_list in train_tags for tag in set(tags_list))
# Filter by min_df
if isinstance(min_df, int):
tags_to_keep = {tag for tag, freq in doc_freq.items() if freq >= min_df}
else: # float
min_doc_count = min_df * len(X_train)
tags_to_keep = {tag for tag, freq in doc_freq.items() if freq >= min_doc_count}
# Select top max_features by frequency
if max_features is not None:
# Sort by frequency (descending), then alphabetically for deterministic ordering
top_tags = sorted(tags_to_keep, key=lambda tag: (-doc_freq[tag], tag))[:max_features]
tags_to_keep = set(top_tags)
# Filter the tokenized tags to only include those in tags_to_keep
train_tags_filtered = train_tags.apply(lambda tags: [tag for tag in tags if tag in tags_to_keep])
# Use MultiLabelBinarizer to create the binary matrix
mlb = MultiLabelBinarizer(classes=sorted(list(tags_to_keep)), sparse_output=sparse)
x_train_binarized = mlb.fit_transform(train_tags_filtered)
# Prepare test data (handles both strings and lists)
test_tags = X_test[field_name].fillna("").apply(lambda x: _normalize_tags(x, tokenizer, lowercase))
test_tags_filtered = test_tags.apply(lambda tags: [tag for tag in tags if tag in tags_to_keep])
x_test_binarized = mlb.transform(test_tags_filtered)
# Create DataFrames for the binarized tags
feature_names = [prefix + tag_name for tag_name in mlb.classes_]
if sparse:
x_train_tags = pd.DataFrame.sparse.from_spmatrix(x_train_binarized, index=X_train.index, columns=feature_names)
x_test_tags = pd.DataFrame.sparse.from_spmatrix(x_test_binarized, index=X_test.index, columns=feature_names)
else:
x_train_tags = pd.DataFrame(x_train_binarized, columns=feature_names, index=X_train.index)
x_test_tags = pd.DataFrame(x_test_binarized, columns=feature_names, index=X_test.index)
x_train_reduced = X_train.drop(columns=[field_name])
x_test_reduced = X_test.drop(columns=[field_name])
return (
pd.merge(x_train_reduced, x_train_tags, left_index=True, right_index=True, how="left"),
pd.merge(x_test_reduced, x_test_tags, left_index=True, right_index=True, how="left"),
)
[docs]
def extract_significant_terms_from_subset(
data_frame: pd.DataFrame,
subset_data_frame: pd.DataFrame,
field_name: str,
vectorizer: CountVectorizer = CountVectorizer(encoding="utf-8", lowercase=True, max_features=500),
) -> pd.Series:
"""Return interesting or unusual occurrences of terms in a subset.
Based on the elasticsearch significant_text aggregation:
https://www.elastic.co/guide/en/elasticsearch/reference/current/search-aggregations-bucket-significantterms-aggregation.html#_scripted
:param data_frame: The full dataset.
:param subset_data_frame: The subset partition data over which the scoring will be calculated.
It can be filtered by feature or other boolean criteria.
:param field_name: The feature to parse.
:param vectorizer: Text count vectorizer which converts a collection of text to a matrix of token counts.
See more info here:
https://scikit-learn.org/stable/modules/generated/sklearn.feature_extraction.text.CountVectorizer.html
:return: Series of terms with scoring over the subset.
:author: Eran Hirsch (https://github.com/eranhirs)
"""
if data_frame.empty:
return pd.Series()
count_matrix = vectorizer.fit_transform(data_frame[field_name].dropna())
matrix_df = pd.DataFrame(count_matrix.toarray(), columns=vectorizer.get_feature_names_out())
subset_x = vectorizer.transform(subset_data_frame[field_name].dropna())
subset_matrix_df = pd.DataFrame(subset_x.toarray(), columns=vectorizer.get_feature_names_out())
subset_freq = subset_matrix_df.sum()
superset_freq = matrix_df.sum()
return (subset_freq / (superset_freq - subset_freq + 1)).sort_values(ascending=False)