Welcome to Feathr’s Python API documentation!
If you are an end user, read Feathr User APIs.
If you have any suggestions for our API documentation, please help us improve it by creating a Github issue for us.
Feathr APIs for End Users
- class feathr.Aggregation(value)[source]
Bases:
Enum
The built-in aggregation functions for LookupFeature
- AVG = 1
- ELEMENTWISE_AVG = 6
- ELEMENTWISE_MAX = 8
- ELEMENTWISE_MIN = 7
- ELEMENTWISE_SUM = 9
- LATEST = 10
- MAX = 2
- MIN = 3
- NOP = 0
- SUM = 4
- UNION = 5
- class feathr.AvroJsonSchema(schemaStr: str)[source]
Bases:
SourceSchema
Avro schema written in Json string form.
- class feathr.BackfillTime(start: datetime, end: datetime, step: timedelta)[source]
Bases:
object
Time range to materialize/backfill feature data.
- start
start time of the backfill, inclusive.
- end
end time of the backfill, inclusive.
- step
duration of each backfill step. e.g. if daily, use timedelta(days=1)
- class feathr.BooleanFeatureType[source]
Bases:
FeatureType
Boolean feature value, either true or false.
- class feathr.DerivedFeature(name: str, feature_type: ~feathr.definition.dtype.FeatureType, input_features: ~typing.Union[~feathr.definition.feature.FeatureBase, ~typing.List[~feathr.definition.feature.FeatureBase]], transform: ~typing.Union[str, ~feathr.definition.transformation.RowTransformation], key: ~typing.Optional[~typing.Union[~feathr.definition.typed_key.TypedKey, ~typing.List[~feathr.definition.typed_key.TypedKey]]] = [<feathr.definition.typed_key.TypedKey object>], registry_tags: ~typing.Optional[~typing.Dict[str, str]] = None)[source]
Bases:
FeatureBase
A derived feature is a feature defined on top of other features, rather than external data source.
- name
derived feature name
- feature_type
type of derived feature
- key
All features with corresponding keys that this derived feature depends on
- input_features
features that this derived features depends on
- transform
transformation that produces the derived feature value, based on the input_features
- registry_tags
A dict of (str, str) that you can pass to feature registry for better organization. For example, you can For example, you can use {“deprecated”: “true”} to indicate this feature is deprecated, etc.
- class feathr.DoubleFeatureType[source]
Bases:
FeatureType
Double feature value, for example, 1.3d, 2.4d. Double has better precision than float.
- class feathr.DoubleVectorFeatureType[source]
Bases:
FeatureType
Double vector feature value, for example, [1.3d, 3.3d, 9.3d]
- class feathr.FeathrClient(config_path: str = './feathr_config.yaml', local_workspace_dir: Optional[str] = None, credential=None, project_registry_tag: Optional[Dict[str, str]] = None)[source]
Bases:
object
Feathr client.
The client is used to create training dataset, materialize features, register features, and fetch features from the online storage.
For offline storage and compute engine, Azure ADLS, AWS S3 and Azure Synapse are supported.
For online storage, currently only Redis is supported. The users of this client is responsible for set up all the necessary information needed to start a Redis client via environment variable or a Spark cluster. Host address, port and password are needed to start the Redis client.
- config_path
config path. See [Feathr Config Template](https://github.com/linkedin/feathr/blob/main/feathr_project/feathrcli/data/feathr_user_workspace/feathr_config.yaml) for more details. Defaults to “./feathr_config.yaml”.
- Type
str, optional
- local_workspace_dir
set where is the local work space dir. If not set, Feathr will create a temporary folder to store local workspace related files.
- Type
str, optional
- credential
credential to access cloud resources, most likely to be the returned result of DefaultAzureCredential(). If not set, Feathr will initialize DefaultAzureCredential() inside the __init__ function to get credentials.
- Type
optional
- project_registry_tag
adding tags for project in Feathr registry. This might be useful if you want to tag your project as deprecated, or allow certain customizations on project leve. Default is empty
- Raises
RuntimeError – Fail to create the client since necessary environment variables are not set for Redis
client creation. –
- build_features(anchor_list: List[FeatureAnchor] = [], derived_feature_list: List[DerivedFeature] = [], verbose: bool = False)[source]
Build features based on the current workspace. all actions that triggers a spark job will be based on the result of this action.
- get_features_from_registry(project_name: str) Dict[str, FeatureBase] [source]
Get feature from registry by project name. The features got from registry are automatically built.
- get_offline_features(observation_settings: ObservationSettings, feature_query: Union[FeatureQuery, List[FeatureQuery]], output_path: str, execution_configuratons: Optional[Union[SparkExecutionConfiguration, Dict[str, str]]] = None, udf_files=None, verbose: bool = False)[source]
Get offline features for the observation dataset :param observation_settings: settings of the observation data, e.g. timestamp columns, input path, etc. :param feature_query: features that are requested to add onto the observation data :param output_path: output path of job, i.e. the observation data with features attached. :param execution_configuratons: a dict that will be passed to spark job when the job starts up, i.e. the “spark configurations”. Note that not all of the configuration will be honored since some of the configurations are managed by the Spark platform, such as Databricks or Azure Synapse. Refer to the [spark documentation](https://spark.apache.org/docs/latest/configuration.html) for a complete list of spark configurations.
- get_online_features(feature_table, key, feature_names)[source]
Fetches feature value for a certain key from a online feature table.
- Parameters
feature_table – the name of the feature table.
key – the key of the entity
feature_names – list of feature names to fetch
- Returns
A list of feature values for this entity. It’s ordered by the requested feature names. For example, feature_names = [‘f_is_medium_trip_distance’, ‘f_day_of_week’, ‘f_day_of_month’, ‘f_hour_of_day’] then, the returned feature values is: [b’true’, b’4.0’, b’31.0’, b’23.0’]. If the feature_table or key doesn’t exist, then a list of Nones are returned. For example, [None, None, None, None]. If a feature doesn’t exist, then a None is returned for that feature. For example: [None, b’4.0’, b’31.0’, b’23.0’].
- list_registered_features(project_name: Optional[str] = None) List[str] [source]
List all the already registered features. If project_name is not provided or is None, it will return all the registered features; otherwise it will only return features under this project
- materialize_features(settings: MaterializationSettings, execution_configuratons: Optional[Union[SparkExecutionConfiguration, Dict[str, str]]] = None, verbose: bool = False)[source]
Materialize feature data
- Parameters
settings – Feature materialization settings
execution_configuratons – a dict that will be passed to spark job when the job starts up, i.e. the “spark configurations”. Note that not all of the configuration will be honored since some of the configurations are managed by the Spark platform, such as Databricks or Azure Synapse. Refer to the [spark documentation](https://spark.apache.org/docs/latest/configuration.html) for a complete list of spark configurations.
- multi_get_online_features(feature_table, keys, feature_names)[source]
Fetches feature value for a list of keys from a online feature table. This is the batch version of the get API.
- Parameters
feature_table – the name of the feature table.
keys – list of keys for the entities
feature_names – list of feature names to fetch
- Returns
A list of feature values for the requested entities. It’s ordered by the requested feature names. For example, keys = [12, 24], feature_names = [‘f_is_medium_trip_distance’, ‘f_day_of_week’, ‘f_day_of_month’, ‘f_hour_of_day’] then, the returned feature values is: {‘12’: [b’false’, b’5.0’, b’1.0’, b’0.0’], ‘24’: [b’true’, b’4.0’, b’31.0’, b’23.0’]}. If the feature_table or key doesn’t exist, then a list of Nones are returned. For example, {‘12’: [None, None, None, None], ‘24’: [None, None, None, None]} If a feature doesn’t exist, then a None is returned for that feature. For example: {‘12’: [None, b’4.0’, b’31.0’, b’23.0’], ‘24’: [b’true’, b’4.0’, b’31.0’, b’23.0’]}.
- register_features(from_context: bool = True)[source]
Registers features based on the current workspace
- Parameters
from_context – If from_context is True (default), the features will be generated from the current context, with the previous built features in client.build(). Otherwise, the features will be generated from
files. (configuration) –
- class feathr.Feature(name: str, feature_type: ~feathr.definition.dtype.FeatureType, key: ~typing.Optional[~typing.Union[~feathr.definition.typed_key.TypedKey, ~typing.List[~feathr.definition.typed_key.TypedKey]]] = [<feathr.definition.typed_key.TypedKey object>], transform: ~typing.Optional[~typing.Union[str, ~feathr.definition.transformation.Transformation]] = None, registry_tags: ~typing.Optional[~typing.Dict[str, str]] = None)[source]
Bases:
FeatureBase
A feature is an individual measurable property or characteristic of an entity. It has a feature name, feature type, and a convenient row transformation used to produce its feature value.
- name
Unique name of the feature. Only alphabet, numbers, and ‘_’ are allowed in the name. It can not start with numbers. Note that ‘.’ is NOT ALLOWED!
- feature_type
the feature value type. e.g. INT32, FLOAT, etc. Should be part of feathr.dtype
- key
The key of this feature. e.g. user_id.
- transform
A row transformation used to produce its feature value. e.g. amount * 10
- registry_tags
A dict of (str, str) that you can pass to feature registry for better organization. For example, you can use {“deprecated”: “true”} to indicate this feature is deprecated, etc.
- class feathr.FeatureAnchor(name: str, source: Source, features: List[Feature], registry_tags: Optional[Dict[str, str]] = None)[source]
Bases:
HoconConvertible
A feature anchor defines a set of features on top of a data source, a.k.a. a set of features anchored to a source.
The feature producer writes multiple anchors for a feature, exposing the same feature name for the feature consumer to reference it.
- name
Unique name of the anchor.
- source
data source that the features are anchored to. Should be either of INPUT_CONTEXT or feathr.source.Source
- features
list of features defined within this anchor.
- registry_tags
A dict of (str, str) that you can pass to feature registry for better organization. For example, you can use {“deprecated”: “true”} to indicate this anchor is deprecated, etc.
- exception feathr.FeatureNameValidationError[source]
Bases:
ValueError
An exception for feature name validation. Feature names must consist of letters, number, or underscores, and cannot begin with a number. Periods are also disallowed, as some compute engines, such as Spark, will consider them as operators in feature name.
- class feathr.FeaturePrinter[source]
Bases:
object
The class for pretty-printing features
- static pretty_print_anchors(anchor_list: List[FeatureAnchor]) None [source]
Pretty print features
- Parameters
feature_list – FeatureAnchor
- static pretty_print_feature_query(feature_query: FeatureQuery) None [source]
Pretty print feature query
- Parameters
feature_query – feature query
- static pretty_print_materialize_features(settings: MaterializationSettings) None [source]
Pretty print feature query
- Parameters
feature_query – feature query
- class feathr.FeatureQuery(feature_list: List[str], key: Optional[Union[TypedKey, List[TypedKey]]] = None)[source]
Bases:
HoconConvertible
A FeatureQuery contains a list of features
- feature_list
a list of feature names
- key
key of feature_list, all features must share the same key
- class feathr.FloatFeatureType[source]
Bases:
FeatureType
Float feature value, for example, 1.3f, 2.4f.
- class feathr.FloatVectorFeatureType[source]
Bases:
FeatureType
Float vector feature value, for example, [1,3f, 2.4f, 3.9f]
- class feathr.HdfsSource(name: str, path: str, preprocessing: Optional[Callable] = None, event_timestamp_column: Optional[str] = None, timestamp_format: Optional[str] = 'epoch', registry_tags: Optional[Dict[str, str]] = None)[source]
Bases:
Source
A data source(table) stored on HDFS-like file system. Data can be fetch through a POSIX style path.
- preprocessing
A preprocessing python function that transforms the source data for further feature transformation.
- Type
Optional[Callable]
- event_timestamp_column
The timestamp field of your record. As sliding window aggregation feature assume each record in the source data should have a timestamp column.
- Type
Optional[str]
- timestamp_format
The format of the timestamp field. Defaults to “epoch”. Possible values are: - epoch (seconds since epoch), for example 1647737463 - epoch_millis (milliseconds since epoch), for example 1647737517761 - Any date formats supported by [SimpleDateFormat](https://docs.oracle.com/javase/8/docs/api/java/text/SimpleDateFormat.html).
- Type
Optional[str], optional
- registry_tags
A dict of (str, str) that you can pass to feature registry for better organization. For example, you can use {“deprecated”: “true”} to indicate this source is deprecated, etc.
- class feathr.InputContext[source]
Bases:
Source
‘Passthrough’ source, a.k.a. request feature source. Feature source data is from the observation data itself. For example, you have an observation data table t1 and another feature data table t2. Some of your feature data can be transformed from the observation data table t1 itself, like geo location, then you can define that feature on top of the InputContext.
- class feathr.Int32FeatureType[source]
Bases:
FeatureType
32-bit integer feature value, for example, 123, 98765.
- class feathr.Int32VectorFeatureType[source]
Bases:
FeatureType
32-bit integer vector feature value, for example, [1, 3, 9]
- class feathr.Int64FeatureType[source]
Bases:
FeatureType
64-bit integer(a.k.a. Long in some system) feature value, for example, 123, 98765 but stored in 64-bit integer.
- class feathr.Int64VectorFeatureType[source]
Bases:
FeatureType
64-bit integer vector feature value, for example, [1, 3, 9]
- class feathr.KafKaSource(name: str, kafkaConfig: KafkaConfig)[source]
Bases:
Source
A kafka source object. Used in streaming feature ingestion.
- class feathr.KafkaConfig(brokers: List[str], topics: List[str], schema: SourceSchema)[source]
Bases:
object
Kafka config for a streaming source
- brokers
broker/server address
- topics
Kafka topics
- schema
Kafka message schema
- class feathr.LookupFeature(name: str, feature_type: ~feathr.definition.dtype.FeatureType, base_feature: ~feathr.definition.feature.FeatureBase, expansion_feature: ~feathr.definition.feature.FeatureBase, aggregation: ~feathr.definition.aggregation.Aggregation, key: ~typing.Optional[~typing.Union[~feathr.definition.typed_key.TypedKey, ~typing.List[~feathr.definition.typed_key.TypedKey]]] = [<feathr.definition.typed_key.TypedKey object>], registry_tags: ~typing.Optional[~typing.Dict[str, str]] = None)[source]
Bases:
FeatureBase
A lookup feature is a feature defined on top of two other features, i.e. using the feature value of the base feature as key, to lookup the feature value from the expansion feature. e.g. a lookup feature user_purchased_item_avg_price could be key-ed by user_id, and computed by: base feature is user_purchased_item_ids. For a given user_id, it returns the item ids purchased by the user. expansion feature is item_price. For a given item id, it returns the item price. aggregation function is average of the item prices.
- name
Derived feature name
- feature_type
Type of derived feature
- key
Join key of the derived feature
- base_feature
The feature value of this feature will be used as key to lookup from the expansion feature
- expansion_feature
The feature to be looked up
- aggregation
Specify the aggregation for the feature values lookup from the expansion feature, in the case of the base feature value needed to be converted into multiple lookup keys, e.g. feature value is an array and each value in the array is used once as a lookup key.
- class feathr.MaterializationSettings(name: str, sinks: List[Sink], feature_names: List[str], backfill_time: Optional[BackfillTime] = None)[source]
Bases:
object
Settings about materialization features.
- name
The materialization job name
- sinks
sinks where the materialized features should be written to
- feature_names
list of feature names to be materialized
- backfill_time
time range and frequency for the materialization. Default to now().
- get_backfill_cutoff_time() List[datetime] [source]
Get the backfill cutoff time points for materialization. E.g. for BackfillTime(start=datetime(2022, 3, 1), end=datetime(2022, 3, 5), step=timedelta(days=1)), it returns cutoff time list as [2022-3-1, 2022-3-2, 2022-3-3, 2022-3-4, 2022-3-5], for BackfillTime(start=datetime(2022, 3, 1, 1), end=datetime(2022, 3, 1, 5), step=timedelta(hours=1)), it returns cutoff time list as [2022-3-1 01:00:00, 2022-3-1 02:00:00, 2022-3-1 03:00:00, 2022-3-1 04:00:00, 2022-3-1 05:00:00]
- class feathr.ObservationSettings(observation_path: str, event_timestamp_column: Optional[str] = None, timestamp_format: str = 'epoch')[source]
Bases:
HoconConvertible
Time settings of the observation data. Used in feature join.
- observation_path
path to the observation dataset, i.e. input dataset to get with features
- event_timestamp_column
The timestamp field of your record. As sliding window aggregation feature assume each record in the source data should have a timestamp column.
- Type
Optional[str]
- timestamp_format
The format of the timestamp field. Defaults to “epoch”. Possible values are: - epoch (seconds since epoch), for example 1647737463 - epoch_millis (milliseconds since epoch), for example 1647737517761 - Any date formats supported by [SimpleDateFormat](https://docs.oracle.com/javase/8/docs/api/java/text/SimpleDateFormat.html).
- Type
Optional[str], optional
- class feathr.RedisSink(table_name: str, streaming: bool = False, streamingTimeoutMs: Optional[int] = None)[source]
Bases:
Sink
Redis-based sink use to store online feature data, can be used in batch job or streaming job.
- table_name
output table name
- streaming
whether it is used in streaming mode
- streamingTimeoutMs
maximum running time for streaming mode. It is not used in batch mode.
- class feathr.Source(name: str, event_timestamp_column: Optional[str] = '0', timestamp_format: Optional[str] = 'epoch', registry_tags: Optional[Dict[str, str]] = None)[source]
Bases:
HoconConvertible
External data source for feature. Typically a data ‘table’.
- name
name of the source. It’s used to differentiate from other sources.
- event_timestamp_column
column name or field name of the event timestamp
- timestamp_format
the format of the event_timestamp_column, e.g. yyyy/MM/DD, or EPOCH
- registry_tags
A dict of (str, str) that you can pass to feature registry for better organization. For example, you can use {“deprecated”: “true”} to indicate this source is deprecated, etc.
- class feathr.SparkExecutionConfiguration(spark_execution_configuration=typing.Dict[str, str])[source]
Bases:
object
A wrapper class to enable Spark Execution Configurations which will be passed to the underlying spark engine. .. attribute:: spark_execution_configuration
dict[str, str] which will be passed to the underlying spark engine
- Returns
dict[str, str]
- class feathr.StringFeatureType[source]
Bases:
FeatureType
String feature value, for example, ‘apple’, ‘orange’.
- class feathr.TypedKey(key_column: str, key_column_type: ValueType, full_name: Optional[str] = None, description: Optional[str] = None, key_column_alias: Optional[str] = None)[source]
Bases:
object
The key of a feature. A feature is typically keyed by some id(s). e.g. product id, user id .. attribute:: key_column
The id column name of this key. e.g. ‘product_id’.
- key_column_type
Types of the key_column
- full_name
Unique name of the key. Recommend using [project_name].[key_name], e.g. ads.user_id
- description
Documentation for the key.
- key_column_alias
Used in some advanced derived features. Default to the key_column.
- class feathr.ValueType(value)[source]
Bases:
Enum
Data type to describe feature keys or observation keys.
- UNSPECIFIED
key data type is unspecified.
- BOOL
key data type is boolean, either true or false
- INT32
key data type is 32-bit integer, for example, an invoice id, 93231.
- INT64
key data type is 64-bit integer, for example, an invoice id, 93231.
- FLOAT
key data type is float, for example, 123.4f.
- DOUBLE
key data type is double, for example, 123.4d.
- STRING
key data type is string, for example, a user name, ‘user_joe’
- BYTES
key data type is bytes.
- BOOL = 1
- BYTES = 7
- DOUBLE = 5
- FLOAT = 4
- INT32 = 2
- INT64 = 3
- STRING = 6
- UNSPECIFIED = 0
- class feathr.WindowAggTransformation(agg_expr: str, agg_func: str, window: str, group_by: Optional[str] = None, filter: Optional[str] = None, limit: Optional[int] = None)[source]
Bases:
Transformation
Aggregate the value of an expression over a fixed time window. E.g. sum(amount*10) over last 3 days.
- agg_expr
expression that transforms the raw value into a new value, e.g. amount * 10
- agg_func
aggregation function. Available values: SUM, COUNT, MAX, MIN, AVG, MAX_POOLING, MIN_POOLING, AVG_POOLING, LATEST
- window
Time window length to apply the aggregation. support 4 type of units: d(day), h(hour), m(minute), s(second). The example value are “7d’ or “5h” or “3m” or “1s”
- group_by
Feathr expressions applied after the agg_expr transformation as groupby field, before aggregation, same as ‘group by’ in SQL
- filter
Feathr expression applied to each row as a filter before aggregation
- feathr.get_result_df(client: FeathrClient, format: Optional[str] = None, res_url: Optional[str] = None) DataFrame [source]
Download the job result dataset from cloud as a Pandas dataframe.
format: format override, could be “parquet”, “delta”, etc. res_url: output URL to download files. Note that this will not block the job so you need to make sure the job is finished and result URL contains actual data.