emm.helper package

Submodules

emm.helper.blocking_functions module

List of blocking functions.

Their names are used to name indexers. Please don’t modify the function names.

emm.helper.blocking_functions.first(x)

First character blocking function.

Parameters:

x (str)

Return type:

str

emm.helper.blocking_functions.first2(x)

First two characters blocking function.

Parameters:

x (str)

Return type:

str

emm.helper.blocking_functions.first3(x)

First two characters blocking function.

Parameters:

x (str)

Return type:

str

emm.helper.custom_path module

class emm.helper.custom_path.CustomPath(*args, **kwargs)

Bases: PosixPath

Custom wrapper for Path class to keep any first double slash

By default Path supports file paths. However, in practice there are URI schemes that are used to refer to paths and for which PurePath manipulations are desirable (in this case S3). To accommodate this functionality, the Path class is extended to detect, capture and store the scheme as attribute. The remainder of the URI is treated as local path.

This approach obviously has its limitations, which is the responsibility of the user. If your use case requires host, port and/or credential information, you should use proper URI parsing.

In practice CustomPath acts just like the normal Path class, for any local files. However, it prevents the replacement of the first-encountered // by / which happens in Path. This makes it possible to also use Path for eg. hdfs or s3 path, not just local ones.

Example: ‘s3://foo/bar/bla’ => ‘s3://foo/bar/bla’ (and not ‘s3:/foo/bar/bla’)

This makes is possible to reuse basic Path string manipulation eg of subdirectories for files on s3. In particular one can do correctly: new_path = path / ‘foo’, and str(path).

For more complex functions, check if CustomPath works, and else use the flag CustomPath.is_local and write an alternative.

Suggestions taken from: https://stackoverflow.com/questions/61689391/error-with-simple-subclassing-of-pathlib-path-no-flavour-attribute https://stackoverflow.com/questions/49078156/use-pathlib-for-s3-paths

Other Resources: https://en.wikipedia.org/wiki/List_of_URI_schemes https://en.wikipedia.org/wiki/File_URI_scheme https://docs.aws.amazon.com/cli/latest/reference/s3/

Args:

Same as Path.

as_uri()

Return the path as a ‘file’ URI.

property is_local

emm.helper.io module

class emm.helper.io.IOFunc

Bases: object

Reader and writer functions used inside SparkCustomWriter/Reader classes

Container with reading and writing function. Used for reading and storage of non-spark objects. By default these are set to joblib’s load and dump functions.

Note: reader and writer are global attributes, so they get picked up by all classes that use IOFunc, and only need to be set once.

Examples:
>>> io = IOFunc()
>>> io.writer = pickle.dump
>>> io.reader = pickle.load
property reader
set_reader(func, call_inside_joblib_load=False, call_inside_pickle_load=False)

Set the reader function

Args:

func: input reader function call_inside_joblib_load: if true, set the reader function as: joblib.load(func(path)). call_inside_pickle_load: if true, set the reader function as: pickle.load(func(path)).

property writer
emm.helper.io.load_joblib(file_path, directory=None)

Load object from (possibly compressed) joblib file

Args:

file_path: full file path, or optionally only file name using the additional directory argument. directory: directory corresponding to file name, is then joined with file name (optional)

Parameters:
  • file_path (str | Path)

  • directory (Union[str, Path, None])

Return type:

object

emm.helper.io.load_pickle(file_path, directory=None)

Load object from pickle file

Args:

file_path: full file path, or optionally only file name using the additional directory argument. directory: directory corresponding to file name, is then joined with file name (optional)

Parameters:
  • file_path (str | Path)

  • directory (Union[str, Path, None])

Return type:

object

emm.helper.io.save_file(file_path, obj, dump_func=<built-in function dump>, **kwargs)

emm.helper.sklearn_pipeline module

class emm.helper.sklearn_pipeline.SklearnPipelineWrapper(steps, *, memory=None, verbose=False)

Bases: Pipeline

Wrapper for sklearn Pipeline, adds support for extra options in transform

set_score_request(*, sample_weight: bool | None | str = '$UNCHANGED$') SklearnPipelineWrapper

Request metadata passed to the score method.

Note that this method is only relevant if enable_metadata_routing=True (see sklearn.set_config()). Please see User Guide on how the routing mechanism works.

The options for each parameter are:

  • True: metadata is requested, and passed to score if provided. The request is ignored if metadata is not provided.

  • False: metadata is not requested and the meta-estimator will not pass it to score.

  • None: metadata is not requested, and the meta-estimator will raise an error if the user provides it.

  • str: metadata should be passed to the meta-estimator with this given alias instead of the original name.

The default (sklearn.utils.metadata_routing.UNCHANGED) retains the existing request. This allows you to change the request for some parameters and not others.

Added in version 1.3.

Note

This method is only relevant if this estimator is used as a sub-estimator of a meta-estimator, e.g. used inside a Pipeline. Otherwise it has no effect.

Parameters

sample_weightstr, True, False, or None, default=sklearn.utils.metadata_routing.UNCHANGED

Metadata routing for sample_weight parameter in score.

Returns

selfobject

The updated object.

transform(X, **transform_options)

Transform the data, and apply transform with the final estimator.

Call transform of each transformer in the pipeline. The transformed data are finally passed to the final estimator that calls transform method. Only valid if the final estimator implements transform.

This also works where final estimator is None in which case all prior transformations are applied.

Parameters

Xiterable

Data to transform. Must fulfill input requirements of first step of the pipeline.

**paramsdict of str -> object

Parameters requested and accepted by steps. Each step must have requested certain metadata for these parameters to be forwarded to them.

Added in version 1.4: Only available if enable_metadata_routing=True. See Metadata Routing User Guide for more details.

Returns

Xtndarray of shape (n_samples, n_transformed_features)

Transformed data.

emm.helper.spark_custom_reader_writer module

emm.helper.spark_ml_pipeline module

emm.helper.spark_utils module

emm.helper.spark_utils.add_uid_column(sdf, uid_col='uid')

monotonically_increasing_id() is recalculated during transform and give different values for the same rows Therefore we need to save temporary the DataFrame with checkpointing for example.

emm.helper.spark_utils.auto_repartitioning(sdf, partition_size, *cols)

Repartition Spark DataFrame so that it has ‘partition_size’ rows per partition If partition_size==None then no repartitioning is done. Returns repartitioned dataframe and size of dataset.

Parameters:
  • sdf (DataFrame)

  • partition_size (int | None)

emm.helper.spark_utils.check_uid(sdf, uid_col)

Check if uid column is there and add it if missing

emm.helper.spark_utils.logical_repartitioning(df, column, num_partitions=None, spark=None)

Making sure we have all the candidates of a names_to_match (uid) in the same partition, we need this for computing rank feature in the pandas UDF. Repartition need to be after computation of missing feature, most probably because vectorizer is doing some repartitioning. This is needed for a logical reason and Spark data/execution parallelism reason.

repartition(k, col) will create a dataframe with k partitions using a hash-based partitioner on col. repartition with the same number of partitions as before, so related to spark.sql.shuffle.partitions and spark.default.parallelism repartition in function of partition_size

Parameters:
  • df (DataFrame)

  • column (str)

  • num_partitions (int | None)

  • spark (SparkSession | None)

Return type:

DataFrame

emm.helper.spark_utils.set_partitions(num_partitions, spark=None)
Parameters:
  • num_partitions (int)

  • spark (SparkSession | None)

Return type:

None

emm.helper.spark_utils.set_spark_job_group(*args, spark=None, **kwargs)

Label the spark job group

Args:

spark: spark session (optional) *args: args to pass to setJobGroup **kwargs: kwargs to pass to setJobGroup

Parameters:

spark (SparkSession | None)

Return type:

None

emm.helper.spark_utils.spark_checkpoint(sdf, spark=None)
Parameters:
  • sdf (DataFrame)

  • spark (SparkSession | None)

Return type:

DataFrame

emm.helper.util module

Helper function for name matching model save and load

emm.helper.util.get_model_title(params)

Construct model title from parameters settings

Extract model title based on model’s indexer settings

Args:

params: model parameters

Parameters:

params (dict)

Return type:

str

emm.helper.util.groupby(data, groups, postprocess_func=None)

Aggregates data using grouping values from groups. Returns dictionary with keys from groups and lists of matching values from data. If postprocessing functions is defined all dictionary values are processed with this function.

Parameters:
  • data (Iterable)

  • groups (Iterable)

  • postprocess_func (Optional[Callable])

Return type:

Mapping

emm.helper.util.indexers_set_values(default_indexer_params, indexers)

Helper function to update indexer settings

Update indexer settings with default values where values are missing. Used when initializing indexers and in parameters.py.

Args:

default_indexer_params: dict with default indexer settings indexers: dict with indexer settings that should be updated

Parameters:
  • default_indexer_params (list[Mapping[str, Any]])

  • indexers (list[Mapping[str, Any]])

Return type:

list[Mapping[str, Any]]

emm.helper.util.rename_columns(df, mapping)

Rename columns of Pandas or Spark DataFrame according to the mapping

emm.helper.util.string_columns_to_pyarrow(df, columns=None)

Convert string columns to pyarrow string datatype

pyarrow string datatype is much more memory efficient. important for large lists of names (1M+).

Args:

df: input pandas dataframe to convert columns: columns to convert to pyarrow string type. if None, pick known relevant columns.

Returns:

converted dataframe

Parameters:
  • df (DataFrame)

  • columns (Optional[list])

Return type:

DataFrame

Module contents