emm.helper package¶
Submodules¶
emm.helper.blocking_functions module¶
List of blocking functions.
Their names are used to name indexers. Please don’t modify the function names.
- emm.helper.blocking_functions.first(x)¶
First character blocking function.
- Parameters:
x (
str)- Return type:
str
- emm.helper.blocking_functions.first2(x)¶
First two characters blocking function.
- Parameters:
x (
str)- Return type:
str
- emm.helper.blocking_functions.first3(x)¶
First two characters blocking function.
- Parameters:
x (
str)- Return type:
str
emm.helper.custom_path module¶
- class emm.helper.custom_path.CustomPath(*args, **kwargs)¶
Bases:
PosixPathCustom wrapper for Path class to keep any first double slash
By default Path supports file paths. However, in practice there are URI schemes that are used to refer to paths and for which PurePath manipulations are desirable (in this case S3). To accommodate this functionality, the Path class is extended to detect, capture and store the scheme as attribute. The remainder of the URI is treated as local path.
This approach obviously has its limitations, which is the responsibility of the user. If your use case requires host, port and/or credential information, you should use proper URI parsing.
In practice CustomPath acts just like the normal Path class, for any local files. However, it prevents the replacement of the first-encountered // by / which happens in Path. This makes it possible to also use Path for eg. hdfs or s3 path, not just local ones.
Example: ‘s3://foo/bar/bla’ => ‘s3://foo/bar/bla’ (and not ‘s3:/foo/bar/bla’)
This makes is possible to reuse basic Path string manipulation eg of subdirectories for files on s3. In particular one can do correctly: new_path = path / ‘foo’, and str(path).
For more complex functions, check if CustomPath works, and else use the flag CustomPath.is_local and write an alternative.
Suggestions taken from: https://stackoverflow.com/questions/61689391/error-with-simple-subclassing-of-pathlib-path-no-flavour-attribute https://stackoverflow.com/questions/49078156/use-pathlib-for-s3-paths
Other Resources: https://en.wikipedia.org/wiki/List_of_URI_schemes https://en.wikipedia.org/wiki/File_URI_scheme https://docs.aws.amazon.com/cli/latest/reference/s3/
- Args:
Same as Path.
- as_uri()¶
Return the path as a ‘file’ URI.
- property is_local¶
emm.helper.io module¶
- class emm.helper.io.IOFunc¶
Bases:
objectReader and writer functions used inside SparkCustomWriter/Reader classes
Container with reading and writing function. Used for reading and storage of non-spark objects. By default these are set to joblib’s load and dump functions.
Note: reader and writer are global attributes, so they get picked up by all classes that use IOFunc, and only need to be set once.
- Examples:
>>> io = IOFunc() >>> io.writer = pickle.dump >>> io.reader = pickle.load
- property reader¶
- set_reader(func, call_inside_joblib_load=False, call_inside_pickle_load=False)¶
Set the reader function
- Args:
func: input reader function call_inside_joblib_load: if true, set the reader function as: joblib.load(func(path)). call_inside_pickle_load: if true, set the reader function as: pickle.load(func(path)).
- property writer¶
- emm.helper.io.load_joblib(file_path, directory=None)¶
Load object from (possibly compressed) joblib file
- Args:
file_path: full file path, or optionally only file name using the additional directory argument. directory: directory corresponding to file name, is then joined with file name (optional)
- Parameters:
file_path (
str|Path)directory (
Union[str,Path,None])
- Return type:
object
- emm.helper.io.load_pickle(file_path, directory=None)¶
Load object from pickle file
- Args:
file_path: full file path, or optionally only file name using the additional directory argument. directory: directory corresponding to file name, is then joined with file name (optional)
- Parameters:
file_path (
str|Path)directory (
Union[str,Path,None])
- Return type:
object
- emm.helper.io.save_file(file_path, obj, dump_func=<built-in function dump>, **kwargs)¶
emm.helper.sklearn_pipeline module¶
- class emm.helper.sklearn_pipeline.SklearnPipelineWrapper(steps, *, memory=None, verbose=False)¶
Bases:
PipelineWrapper for sklearn Pipeline, adds support for extra options in transform
- set_score_request(*, sample_weight: bool | None | str = '$UNCHANGED$') SklearnPipelineWrapper¶
Request metadata passed to the
scoremethod.Note that this method is only relevant if
enable_metadata_routing=True(seesklearn.set_config()). Please see User Guide on how the routing mechanism works.The options for each parameter are:
True: metadata is requested, and passed toscoreif provided. The request is ignored if metadata is not provided.False: metadata is not requested and the meta-estimator will not pass it toscore.None: metadata is not requested, and the meta-estimator will raise an error if the user provides it.str: metadata should be passed to the meta-estimator with this given alias instead of the original name.
The default (
sklearn.utils.metadata_routing.UNCHANGED) retains the existing request. This allows you to change the request for some parameters and not others.Added in version 1.3.
Note
This method is only relevant if this estimator is used as a sub-estimator of a meta-estimator, e.g. used inside a
Pipeline. Otherwise it has no effect.Parameters¶
- sample_weightstr, True, False, or None, default=sklearn.utils.metadata_routing.UNCHANGED
Metadata routing for
sample_weightparameter inscore.
Returns¶
- selfobject
The updated object.
- transform(X, **transform_options)¶
Transform the data, and apply transform with the final estimator.
Call transform of each transformer in the pipeline. The transformed data are finally passed to the final estimator that calls transform method. Only valid if the final estimator implements transform.
This also works where final estimator is None in which case all prior transformations are applied.
Parameters¶
- Xiterable
Data to transform. Must fulfill input requirements of first step of the pipeline.
- **paramsdict of str -> object
Parameters requested and accepted by steps. Each step must have requested certain metadata for these parameters to be forwarded to them.
Added in version 1.4: Only available if enable_metadata_routing=True. See Metadata Routing User Guide for more details.
Returns¶
- Xtndarray of shape (n_samples, n_transformed_features)
Transformed data.
emm.helper.spark_custom_reader_writer module¶
emm.helper.spark_ml_pipeline module¶
emm.helper.spark_utils module¶
- emm.helper.spark_utils.add_uid_column(sdf, uid_col='uid')¶
monotonically_increasing_id() is recalculated during transform and give different values for the same rows Therefore we need to save temporary the DataFrame with checkpointing for example.
- emm.helper.spark_utils.auto_repartitioning(sdf, partition_size, *cols)¶
Repartition Spark DataFrame so that it has ‘partition_size’ rows per partition If partition_size==None then no repartitioning is done. Returns repartitioned dataframe and size of dataset.
- Parameters:
sdf (DataFrame)
partition_size (int | None)
- emm.helper.spark_utils.check_uid(sdf, uid_col)¶
Check if uid column is there and add it if missing
- emm.helper.spark_utils.logical_repartitioning(df, column, num_partitions=None, spark=None)¶
Making sure we have all the candidates of a names_to_match (uid) in the same partition, we need this for computing rank feature in the pandas UDF. Repartition need to be after computation of missing feature, most probably because vectorizer is doing some repartitioning. This is needed for a logical reason and Spark data/execution parallelism reason.
repartition(k, col) will create a dataframe with k partitions using a hash-based partitioner on col. repartition with the same number of partitions as before, so related to spark.sql.shuffle.partitions and spark.default.parallelism repartition in function of partition_size
- Parameters:
df (DataFrame)
column (str)
num_partitions (int | None)
spark (SparkSession | None)
- Return type:
DataFrame
- emm.helper.spark_utils.set_partitions(num_partitions, spark=None)¶
- Parameters:
num_partitions (int)
spark (SparkSession | None)
- Return type:
None
- emm.helper.spark_utils.set_spark_job_group(*args, spark=None, **kwargs)¶
Label the spark job group
- Args:
spark: spark session (optional) *args: args to pass to setJobGroup **kwargs: kwargs to pass to setJobGroup
- Parameters:
spark (SparkSession | None)
- Return type:
None
- emm.helper.spark_utils.spark_checkpoint(sdf, spark=None)¶
- Parameters:
sdf (DataFrame)
spark (SparkSession | None)
- Return type:
DataFrame
emm.helper.util module¶
Helper function for name matching model save and load
- emm.helper.util.get_model_title(params)¶
Construct model title from parameters settings
Extract model title based on model’s indexer settings
- Args:
params: model parameters
- Parameters:
params (
dict)- Return type:
str
- emm.helper.util.groupby(data, groups, postprocess_func=None)¶
Aggregates data using grouping values from groups. Returns dictionary with keys from groups and lists of matching values from data. If postprocessing functions is defined all dictionary values are processed with this function.
- Parameters:
data (
Iterable)groups (
Iterable)postprocess_func (
Optional[Callable])
- Return type:
Mapping
- emm.helper.util.indexers_set_values(default_indexer_params, indexers)¶
Helper function to update indexer settings
Update indexer settings with default values where values are missing. Used when initializing indexers and in parameters.py.
- Args:
default_indexer_params: dict with default indexer settings indexers: dict with indexer settings that should be updated
- Parameters:
default_indexer_params (
list[Mapping[str,Any]])indexers (
list[Mapping[str,Any]])
- Return type:
list[Mapping[str,Any]]
- emm.helper.util.rename_columns(df, mapping)¶
Rename columns of Pandas or Spark DataFrame according to the mapping
- emm.helper.util.string_columns_to_pyarrow(df, columns=None)¶
Convert string columns to pyarrow string datatype
pyarrow string datatype is much more memory efficient. important for large lists of names (1M+).
- Args:
df: input pandas dataframe to convert columns: columns to convert to pyarrow string type. if None, pick known relevant columns.
- Returns:
converted dataframe
- Parameters:
df (
DataFrame)columns (
Optional[list])
- Return type:
DataFrame