feathr package

Module contents

class feathr.Aggregation(value)[source]

Bases: Enum

The built-in aggregation functions for LookupFeature

AVG = 1
ELEMENTWISE_AVG = 6
ELEMENTWISE_MAX = 8
ELEMENTWISE_MIN = 7
ELEMENTWISE_SUM = 9
LATEST = 10
MAX = 2
MIN = 3
NOP = 0
SUM = 4
UNION = 5
class feathr.AvroJsonSchema(schemaStr: str)[source]

Bases: SourceSchema

Avro schema written in Json string form.

to_feature_config()[source]

Convert the feature anchor definition into internal HOCON format.

class feathr.BackfillTime(start: datetime, end: datetime, step: timedelta)[source]

Bases: object

Time range to materialize/backfill feature data.

start

start time of the backfill, inclusive.

end

end time of the backfill, inclusive.

step

duration of each backfill step. e.g. if daily, use timedelta(days=1)

class feathr.BooleanFeatureType[source]

Bases: FeatureType

Boolean feature value, either true or false.

to_feature_config() str[source]

Convert the feature anchor definition into internal HOCON format. (For internal use ony)

class feathr.BytesFeatureType[source]

Bases: FeatureType

Bytes feature value.

to_feature_config() str[source]

Convert the feature anchor definition into internal HOCON format. (For internal use ony)

class feathr.DerivedFeature(name: str, feature_type: ~feathr.definition.dtype.FeatureType, input_features: ~typing.Union[~feathr.definition.feature.FeatureBase, ~typing.List[~feathr.definition.feature.FeatureBase]], transform: ~typing.Union[str, ~feathr.definition.transformation.RowTransformation], key: ~typing.Optional[~typing.Union[~feathr.definition.typed_key.TypedKey, ~typing.List[~feathr.definition.typed_key.TypedKey]]] = [<feathr.definition.typed_key.TypedKey object>], registry_tags: ~typing.Optional[~typing.Dict[str, str]] = None)[source]

Bases: FeatureBase

A derived feature is a feature defined on top of other features, rather than external data source.

name

derived feature name

feature_type

type of derived feature

key

All features with corresponding keys that this derived feature depends on

input_features

features that this derived features depends on

transform

transformation that produces the derived feature value, based on the input_features

registry_tags

A dict of (str, str) that you can pass to feature registry for better organization. For example, you can For example, you can use {“deprecated”: “true”} to indicate this feature is deprecated, etc.

to_feature_config() str[source]

Convert the feature anchor definition into internal HOCON format. (For internal use ony)

validate_feature()[source]

Validate the derived feature is valid

class feathr.DoubleFeatureType[source]

Bases: FeatureType

Double feature value, for example, 1.3d, 2.4d. Double has better precision than float.

to_feature_config() str[source]

Convert the feature anchor definition into internal HOCON format. (For internal use ony)

class feathr.DoubleVectorFeatureType[source]

Bases: FeatureType

Double vector feature value, for example, [1.3d, 3.3d, 9.3d]

to_feature_config() str[source]

Convert the feature anchor definition into internal HOCON format. (For internal use ony)

class feathr.FeathrClient(config_path: str = './feathr_config.yaml', local_workspace_dir: Optional[str] = None, credential=None, project_registry_tag: Optional[Dict[str, str]] = None)[source]

Bases: object

Feathr client.

The client is used to create training dataset, materialize features, register features, and fetch features from the online storage.

For offline storage and compute engine, Azure ADLS, AWS S3 and Azure Synapse are supported.

For online storage, currently only Redis is supported. The users of this client is responsible for set up all the necessary information needed to start a Redis client via environment variable or a Spark cluster. Host address, port and password are needed to start the Redis client.

config_path

config path. See [Feathr Config Template](https://github.com/linkedin/feathr/blob/main/feathr_project/feathrcli/data/feathr_user_workspace/feathr_config.yaml) for more details. Defaults to “./feathr_config.yaml”.

Type

str, optional

local_workspace_dir

set where is the local work space dir. If not set, Feathr will create a temporary folder to store local workspace related files.

Type

str, optional

credential

credential to access cloud resources, most likely to be the returned result of DefaultAzureCredential(). If not set, Feathr will initialize DefaultAzureCredential() inside the __init__ function to get credentials.

Type

optional

project_registry_tag

adding tags for project in Feathr registry. This might be useful if you want to tag your project as deprecated, or allow certain customizations on project leve. Default is empty

Type

Dict[str, str]

Raises
  • RuntimeError – Fail to create the client since necessary environment variables are not set for Redis

  • client creation.

build_features(anchor_list: List[FeatureAnchor] = [], derived_feature_list: List[DerivedFeature] = [], verbose: bool = False)[source]

Build features based on the current workspace. all actions that triggers a spark job will be based on the result of this action.

get_features_from_registry(project_name: str) Dict[str, FeatureBase][source]

Get feature from registry by project name. The features got from registry are automatically built.

get_job_result_uri(block=True, timeout_sec=300) str[source]

Gets the job output URI

get_job_tags() Dict[str, str][source]

Gets the job tags

get_offline_features(observation_settings: ObservationSettings, feature_query: Union[FeatureQuery, List[FeatureQuery]], output_path: str, execution_configuratons: Optional[Union[SparkExecutionConfiguration, Dict[str, str]]] = None, udf_files=None, verbose: bool = False)[source]

Get offline features for the observation dataset :param observation_settings: settings of the observation data, e.g. timestamp columns, input path, etc. :param feature_query: features that are requested to add onto the observation data :param output_path: output path of job, i.e. the observation data with features attached. :param execution_configuratons: a dict that will be passed to spark job when the job starts up, i.e. the “spark configurations”. Note that not all of the configuration will be honored since some of the configurations are managed by the Spark platform, such as Databricks or Azure Synapse. Refer to the [spark documentation](https://spark.apache.org/docs/latest/configuration.html) for a complete list of spark configurations.

get_online_features(feature_table, key, feature_names)[source]

Fetches feature value for a certain key from a online feature table.

Parameters
  • feature_table – the name of the feature table.

  • key – the key of the entity

  • feature_names – list of feature names to fetch

Returns

A list of feature values for this entity. It’s ordered by the requested feature names. For example, feature_names = [‘f_is_medium_trip_distance’, ‘f_day_of_week’, ‘f_day_of_month’, ‘f_hour_of_day’] then, the returned feature values is: [b’true’, b’4.0’, b’31.0’, b’23.0’]. If the feature_table or key doesn’t exist, then a list of Nones are returned. For example, [None, None, None, None]. If a feature doesn’t exist, then a None is returned for that feature. For example: [None, b’4.0’, b’31.0’, b’23.0’].

list_registered_features(project_name: Optional[str] = None) List[str][source]

List all the already registered features. If project_name is not provided or is None, it will return all the registered features; otherwise it will only return features under this project

materialize_features(settings: MaterializationSettings, execution_configuratons: Optional[Union[SparkExecutionConfiguration, Dict[str, str]]] = None, verbose: bool = False)[source]

Materialize feature data

Parameters
  • settings – Feature materialization settings

  • execution_configuratons – a dict that will be passed to spark job when the job starts up, i.e. the “spark configurations”. Note that not all of the configuration will be honored since some of the configurations are managed by the Spark platform, such as Databricks or Azure Synapse. Refer to the [spark documentation](https://spark.apache.org/docs/latest/configuration.html) for a complete list of spark configurations.

multi_get_online_features(feature_table, keys, feature_names)[source]

Fetches feature value for a list of keys from a online feature table. This is the batch version of the get API.

Parameters
  • feature_table – the name of the feature table.

  • keys – list of keys for the entities

  • feature_names – list of feature names to fetch

Returns

A list of feature values for the requested entities. It’s ordered by the requested feature names. For example, keys = [12, 24], feature_names = [‘f_is_medium_trip_distance’, ‘f_day_of_week’, ‘f_day_of_month’, ‘f_hour_of_day’] then, the returned feature values is: {‘12’: [b’false’, b’5.0’, b’1.0’, b’0.0’], ‘24’: [b’true’, b’4.0’, b’31.0’, b’23.0’]}. If the feature_table or key doesn’t exist, then a list of Nones are returned. For example, {‘12’: [None, None, None, None], ‘24’: [None, None, None, None]} If a feature doesn’t exist, then a None is returned for that feature. For example: {‘12’: [None, b’4.0’, b’31.0’, b’23.0’], ‘24’: [b’true’, b’4.0’, b’31.0’, b’23.0’]}.

register_features(from_context: bool = True)[source]

Registers features based on the current workspace

Parameters
  • from_context – If from_context is True (default), the features will be generated from the current context, with the previous built features in client.build(). Otherwise, the features will be generated from

  • files. (configuration) –

wait_job_to_finish(timeout_sec: int = 300)[source]

Waits for the job to finish in a blocking way unless it times out

class feathr.Feature(name: str, feature_type: ~feathr.definition.dtype.FeatureType, key: ~typing.Optional[~typing.Union[~feathr.definition.typed_key.TypedKey, ~typing.List[~feathr.definition.typed_key.TypedKey]]] = [<feathr.definition.typed_key.TypedKey object>], transform: ~typing.Optional[~typing.Union[str, ~feathr.definition.transformation.Transformation]] = None, registry_tags: ~typing.Optional[~typing.Dict[str, str]] = None)[source]

Bases: FeatureBase

A feature is an individual measurable property or characteristic of an entity. It has a feature name, feature type, and a convenient row transformation used to produce its feature value.

name

Unique name of the feature. Only alphabet, numbers, and ‘_’ are allowed in the name. It can not start with numbers. Note that ‘.’ is NOT ALLOWED!

feature_type

the feature value type. e.g. INT32, FLOAT, etc. Should be part of feathr.dtype

key

The key of this feature. e.g. user_id.

transform

A row transformation used to produce its feature value. e.g. amount * 10

registry_tags

A dict of (str, str) that you can pass to feature registry for better organization. For example, you can use {“deprecated”: “true”} to indicate this feature is deprecated, etc.

to_feature_config() str[source]

Convert the feature anchor definition into internal HOCON format. (For internal use ony)

class feathr.FeatureAnchor(name: str, source: Source, features: List[Feature], registry_tags: Optional[Dict[str, str]] = None)[source]

Bases: HoconConvertible

A feature anchor defines a set of features on top of a data source, a.k.a. a set of features anchored to a source.

The feature producer writes multiple anchors for a feature, exposing the same feature name for the feature consumer to reference it.

name

Unique name of the anchor.

source

data source that the features are anchored to. Should be either of INPUT_CONTEXT or feathr.source.Source

features

list of features defined within this anchor.

registry_tags

A dict of (str, str) that you can pass to feature registry for better organization. For example, you can use {“deprecated”: “true”} to indicate this anchor is deprecated, etc.

to_feature_config() str[source]

Convert the feature anchor definition into internal HOCON format. (For internal use ony)

validate_features()[source]

Validate that anchor is non-empty and all its features share the same key

exception feathr.FeatureNameValidationError[source]

Bases: ValueError

An exception for feature name validation. Feature names must consist of letters, number, or underscores, and cannot begin with a number. Periods are also disallowed, as some compute engines, such as Spark, will consider them as operators in feature name.

class feathr.FeaturePrinter[source]

Bases: object

The class for pretty-printing features

static pretty_print_anchors(anchor_list: List[FeatureAnchor]) None[source]

Pretty print features

Parameters

feature_list – FeatureAnchor

static pretty_print_feature_query(feature_query: FeatureQuery) None[source]

Pretty print feature query

Parameters

feature_query – feature query

static pretty_print_materialize_features(settings: MaterializationSettings) None[source]

Pretty print feature query

Parameters

feature_query – feature query

class feathr.FeatureQuery(feature_list: List[str], key: Optional[Union[TypedKey, List[TypedKey]]] = None)[source]

Bases: HoconConvertible

A FeatureQuery contains a list of features

feature_list

a list of feature names

key

key of feature_list, all features must share the same key

to_feature_config() str[source]

Convert the feature anchor definition into internal HOCON format. (For internal use ony)

class feathr.FloatFeatureType[source]

Bases: FeatureType

Float feature value, for example, 1.3f, 2.4f.

to_feature_config() str[source]

Convert the feature anchor definition into internal HOCON format. (For internal use ony)

class feathr.FloatVectorFeatureType[source]

Bases: FeatureType

Float vector feature value, for example, [1,3f, 2.4f, 3.9f]

to_feature_config() str[source]

Convert the feature anchor definition into internal HOCON format. (For internal use ony)

class feathr.HdfsSource(name: str, path: str, preprocessing: Optional[Callable] = None, event_timestamp_column: Optional[str] = None, timestamp_format: Optional[str] = 'epoch', registry_tags: Optional[Dict[str, str]] = None)[source]

Bases: Source

A data source(table) stored on HDFS-like file system. Data can be fetch through a POSIX style path.

name

name of the source

Type

str

path

The location of the source data.

Type

str

preprocessing

A preprocessing python function that transforms the source data for further feature transformation.

Type

Optional[Callable]

event_timestamp_column

The timestamp field of your record. As sliding window aggregation feature assume each record in the source data should have a timestamp column.

Type

Optional[str]

timestamp_format

The format of the timestamp field. Defaults to “epoch”. Possible values are: - epoch (seconds since epoch), for example 1647737463 - epoch_millis (milliseconds since epoch), for example 1647737517761 - Any date formats supported by [SimpleDateFormat](https://docs.oracle.com/javase/8/docs/api/java/text/SimpleDateFormat.html).

Type

Optional[str], optional

registry_tags

A dict of (str, str) that you can pass to feature registry for better organization. For example, you can use {“deprecated”: “true”} to indicate this source is deprecated, etc.

to_feature_config() str[source]

Convert the feature anchor definition into internal HOCON format. (For internal use ony)

class feathr.InputContext[source]

Bases: Source

‘Passthrough’ source, a.k.a. request feature source. Feature source data is from the observation data itself. For example, you have an observation data table t1 and another feature data table t2. Some of your feature data can be transformed from the observation data table t1 itself, like geo location, then you can define that feature on top of the InputContext.

to_feature_config() str[source]

Convert the feature anchor definition into internal HOCON format. (For internal use ony)

class feathr.Int32FeatureType[source]

Bases: FeatureType

32-bit integer feature value, for example, 123, 98765.

to_feature_config() str[source]

Convert the feature anchor definition into internal HOCON format. (For internal use ony)

class feathr.Int32VectorFeatureType[source]

Bases: FeatureType

32-bit integer vector feature value, for example, [1, 3, 9]

to_feature_config() str[source]

Convert the feature anchor definition into internal HOCON format. (For internal use ony)

class feathr.Int64FeatureType[source]

Bases: FeatureType

64-bit integer(a.k.a. Long in some system) feature value, for example, 123, 98765 but stored in 64-bit integer.

to_feature_config() str[source]

Convert the feature anchor definition into internal HOCON format. (For internal use ony)

class feathr.Int64VectorFeatureType[source]

Bases: FeatureType

64-bit integer vector feature value, for example, [1, 3, 9]

to_feature_config() str[source]

Convert the feature anchor definition into internal HOCON format. (For internal use ony)

class feathr.KafKaSource(name: str, kafkaConfig: KafkaConfig)[source]

Bases: Source

A kafka source object. Used in streaming feature ingestion.

to_feature_config() str[source]

Convert the feature anchor definition into internal HOCON format. (For internal use ony)

class feathr.KafkaConfig(brokers: List[str], topics: List[str], schema: SourceSchema)[source]

Bases: object

Kafka config for a streaming source

brokers

broker/server address

topics

Kafka topics

schema

Kafka message schema

class feathr.LookupFeature(name: str, feature_type: ~feathr.definition.dtype.FeatureType, base_feature: ~feathr.definition.feature.FeatureBase, expansion_feature: ~feathr.definition.feature.FeatureBase, aggregation: ~feathr.definition.aggregation.Aggregation, key: ~typing.Optional[~typing.Union[~feathr.definition.typed_key.TypedKey, ~typing.List[~feathr.definition.typed_key.TypedKey]]] = [<feathr.definition.typed_key.TypedKey object>], registry_tags: ~typing.Optional[~typing.Dict[str, str]] = None)[source]

Bases: FeatureBase

A lookup feature is a feature defined on top of two other features, i.e. using the feature value of the base feature as key, to lookup the feature value from the expansion feature. e.g. a lookup feature user_purchased_item_avg_price could be key-ed by user_id, and computed by: base feature is user_purchased_item_ids. For a given user_id, it returns the item ids purchased by the user. expansion feature is item_price. For a given item id, it returns the item price. aggregation function is average of the item prices.

name

Derived feature name

feature_type

Type of derived feature

key

Join key of the derived feature

base_feature

The feature value of this feature will be used as key to lookup from the expansion feature

expansion_feature

The feature to be looked up

aggregation

Specify the aggregation for the feature values lookup from the expansion feature, in the case of the base feature value needed to be converted into multiple lookup keys, e.g. feature value is an array and each value in the array is used once as a lookup key.

to_feature_config() str[source]

Convert the feature anchor definition into internal HOCON format. (For internal use ony)

class feathr.MaterializationSettings(name: str, sinks: List[Sink], feature_names: List[str], backfill_time: Optional[BackfillTime] = None)[source]

Bases: object

Settings about materialization features.

name

The materialization job name

sinks

sinks where the materialized features should be written to

feature_names

list of feature names to be materialized

backfill_time

time range and frequency for the materialization. Default to now().

get_backfill_cutoff_time() List[datetime][source]

Get the backfill cutoff time points for materialization. E.g. for BackfillTime(start=datetime(2022, 3, 1), end=datetime(2022, 3, 5), step=timedelta(days=1)), it returns cutoff time list as [2022-3-1, 2022-3-2, 2022-3-3, 2022-3-4, 2022-3-5], for BackfillTime(start=datetime(2022, 3, 1, 1), end=datetime(2022, 3, 1, 5), step=timedelta(hours=1)), it returns cutoff time list as [2022-3-1 01:00:00, 2022-3-1 02:00:00, 2022-3-1 03:00:00, 2022-3-1 04:00:00, 2022-3-1 05:00:00]

class feathr.ObservationSettings(observation_path: str, event_timestamp_column: Optional[str] = None, timestamp_format: str = 'epoch')[source]

Bases: HoconConvertible

Time settings of the observation data. Used in feature join.

observation_path

path to the observation dataset, i.e. input dataset to get with features

event_timestamp_column

The timestamp field of your record. As sliding window aggregation feature assume each record in the source data should have a timestamp column.

Type

Optional[str]

timestamp_format

The format of the timestamp field. Defaults to “epoch”. Possible values are: - epoch (seconds since epoch), for example 1647737463 - epoch_millis (milliseconds since epoch), for example 1647737517761 - Any date formats supported by [SimpleDateFormat](https://docs.oracle.com/javase/8/docs/api/java/text/SimpleDateFormat.html).

Type

Optional[str], optional

to_feature_config() str[source]

Convert the feature anchor definition into internal HOCON format. (For internal use ony)

class feathr.RedisSink(table_name: str, streaming: bool = False, streamingTimeoutMs: Optional[int] = None)[source]

Bases: Sink

Redis-based sink use to store online feature data, can be used in batch job or streaming job.

table_name

output table name

streaming

whether it is used in streaming mode

streamingTimeoutMs

maximum running time for streaming mode. It is not used in batch mode.

to_feature_config() str[source]

Produce the config used in feature materialization

class feathr.Source(name: str, event_timestamp_column: Optional[str] = '0', timestamp_format: Optional[str] = 'epoch', registry_tags: Optional[Dict[str, str]] = None)[source]

Bases: HoconConvertible

External data source for feature. Typically a data ‘table’.

name

name of the source. It’s used to differentiate from other sources.

event_timestamp_column

column name or field name of the event timestamp

timestamp_format

the format of the event_timestamp_column, e.g. yyyy/MM/DD, or EPOCH

registry_tags

A dict of (str, str) that you can pass to feature registry for better organization. For example, you can use {“deprecated”: “true”} to indicate this source is deprecated, etc.

class feathr.SparkExecutionConfiguration(spark_execution_configuration=typing.Dict[str, str])[source]

Bases: object

A wrapper class to enable Spark Execution Configurations which will be passed to the underlying spark engine. .. attribute:: spark_execution_configuration

dict[str, str] which will be passed to the underlying spark engine

Returns

dict[str, str]

class feathr.StringFeatureType[source]

Bases: FeatureType

String feature value, for example, ‘apple’, ‘orange’.

to_feature_config() str[source]

Convert the feature anchor definition into internal HOCON format. (For internal use ony)

class feathr.TypedKey(key_column: str, key_column_type: ValueType, full_name: Optional[str] = None, description: Optional[str] = None, key_column_alias: Optional[str] = None)[source]

Bases: object

The key of a feature. A feature is typically keyed by some id(s). e.g. product id, user id .. attribute:: key_column

The id column name of this key. e.g. ‘product_id’.

key_column_type

Types of the key_column

full_name

Unique name of the key. Recommend using [project_name].[key_name], e.g. ads.user_id

description

Documentation for the key.

key_column_alias

Used in some advanced derived features. Default to the key_column.

as_key(key_column_alias: str) TypedKey[source]

Rename the key alias. This is useful in derived features that depends on the same feature with different keys.

class feathr.ValueType(value)[source]

Bases: Enum

Data type to describe feature keys or observation keys.

UNSPECIFIED

key data type is unspecified.

BOOL

key data type is boolean, either true or false

INT32

key data type is 32-bit integer, for example, an invoice id, 93231.

INT64

key data type is 64-bit integer, for example, an invoice id, 93231.

FLOAT

key data type is float, for example, 123.4f.

DOUBLE

key data type is double, for example, 123.4d.

STRING

key data type is string, for example, a user name, ‘user_joe’

BYTES

key data type is bytes.

BOOL = 1
BYTES = 7
DOUBLE = 5
FLOAT = 4
INT32 = 2
INT64 = 3
STRING = 6
UNSPECIFIED = 0
class feathr.WindowAggTransformation(agg_expr: str, agg_func: str, window: str, group_by: Optional[str] = None, filter: Optional[str] = None, limit: Optional[int] = None)[source]

Bases: Transformation

Aggregate the value of an expression over a fixed time window. E.g. sum(amount*10) over last 3 days.

agg_expr

expression that transforms the raw value into a new value, e.g. amount * 10

agg_func

aggregation function. Available values: SUM, COUNT, MAX, MIN, AVG, MAX_POOLING, MIN_POOLING, AVG_POOLING, LATEST

window

Time window length to apply the aggregation. support 4 type of units: d(day), h(hour), m(minute), s(second). The example value are “7d’ or “5h” or “3m” or “1s”

group_by

Feathr expressions applied after the agg_expr transformation as groupby field, before aggregation, same as ‘group by’ in SQL

filter

Feathr expression applied to each row as a filter before aggregation

to_feature_config(with_def_field_name: Optional[bool] = True) str[source]

Convert the feature anchor definition into internal HOCON format. (For internal use ony)

feathr.get_result_df(client: FeathrClient, format: Optional[str] = None, res_url: Optional[str] = None) DataFrame[source]

Download the job result dataset from cloud as a Pandas dataframe.

format: format override, could be “parquet”, “delta”, etc. res_url: output URL to download files. Note that this will not block the job so you need to make sure the job is finished and result URL contains actual data.