Contents

ML Metadata - the version control for ML

0. What is MLMD

MLMD is a library to track the full lineage of your entire ML workflow. Full lineage is all the steps from data ingestion, data preprocessing, validation, training, evaluation, deployment, and so on.

1. Basic imports

1
2
3
import numpy as np
import tensorflow as tf
print(tf.__version__)

2. Connect to metadata store

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
import ml_metadata as mlmd
from ml_metadata.metadata_store import metadata_store
from ml_metadata.proto import metadata_store_pb2
connection_config = metadata_store_pb2.ConnectionConfig()
connection_config.sqlite.filename_uri = 'mlmd.sqlite'
connection_config.sqlite.connection_mode = 3 # READWRITE_OPENCREATE
store = metadata_store.MetadataStore(connection_config)

print(mlmd.__version__)
print(store)

3. Define the schemas ( This should be done ONLY once, it is like “CREATE TABLE xyz (…)”)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
# Create ArtifactTypes for Input Training
data_type = metadata_store_pb2.ArtifactType()
data_type.name = "DataSet"
data_type.properties["shape"] = metadata_store_pb2.STRING
data_type.properties["description"] = metadata_store_pb2.STRING
data_type.properties["split"] = metadata_store_pb2.STRING
data_type_id = store.put_artifact_type(data_type)

# Create ArtifactTypes for Output Model
model_type = metadata_store_pb2.ArtifactType()
model_type.name = "SavedModel"
model_type.properties["version"] = metadata_store_pb2.STRING
model_type.properties["name"] = metadata_store_pb2.STRING
model_type.properties["framework"] = metadata_store_pb2.STRING
model_type_id = store.put_artifact_type(model_type)

# Create ArtifactTypes for Metrics
metric_type = metadata_store_pb2.ArtifactType()
metric_type.name = "Metrics"
metric_type.properties["name"] = metadata_store_pb2.STRING
metric_type.properties["value"] = metadata_store_pb2.STRING
metric_type_id = store.put_artifact_type(metric_type)

# Execution is like running a KFP component
trainer_type = metadata_store_pb2.ExecutionType()
trainer_type.name = "Trainer"
trainer_type.properties["state"] = metadata_store_pb2.STRING
trainer_type_id = store.put_execution_type(trainer_type)
    
# Context is alike an Experiment
experiment_type = metadata_store_pb2.ContextType()
experiment_type.name = "Experiment"
experiment_type.properties["description"] = metadata_store_pb2.STRING
experiment_type.properties["maintainer"] = metadata_store_pb2.STRING
experiment_type.properties["env"] = metadata_store_pb2.STRING
experiment_type_id = store.put_context_type(experiment_type)

4. Build a simple ML model

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
model = tf.keras.models.Sequential([
        tf.keras.layers.Flatten(input_shape=(28, 28)),
        tf.keras.layers.Dense(128, activation='relu'),
        tf.keras.layers.Dense(10)
    ])
model.compile(
        optimizer=tf.keras.optimizers.Adam(0.001),
        loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
        metrics=[tf.keras.metrics.SparseCategoricalAccuracy()], # metrics=['sparse_categorical_accuracy']
    )
print(model)

5. Load the input MNIST dataset for Training/Testing

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
SHUFFLE_BUFFER_SIZE = 100
BATCH_SIZE = 64
    
# 1. download the raw dataset
path = tf.keras.utils.get_file('mnist.npz', 'https://storage.googleapis.com/tensorflow/tf-keras-datasets/mnist.npz')
with np.load(path) as data:
    x_train = data['x_train']
    y_train = data['y_train']
    x_test = data['x_test']
    y_test = data['y_test']    
    
# 2. create a dataset for train
ds_train = tf.data.Dataset.from_tensor_slices((x_train, y_train))
ds_train = ds_train.shuffle(SHUFFLE_BUFFER_SIZE).batch(BATCH_SIZE)

# 3. create a dataset for test
ds_test = tf.data.Dataset.from_tensor_slices((x_test, y_test))


print(ds_train)

6. Create input Artifacts for training/testing

Here we created two records for “train” and “test” respectively.

Consider it is like “INSERT INTO TABLE xyz VALUES (…)”

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
train_artifact = metadata_store_pb2.Artifact()
train_artifact.uri = 'gs://abc/step1_output_train_data'
train_artifact.properties["shape"].string_value = '(5000, 28, 28)'
train_artifact.properties["split"].string_value = 'train'
train_artifact.properties["description"].string_value = 'this is the training data randomly sampled from default MNIST data with seed=13'
train_artifact.type_id = store.get_artifact_type(type_name="DataSet").id
[train_artifact_id] = store.put_artifacts([train_artifact])


test_artifact = metadata_store_pb2.Artifact()
test_artifact.uri = 'gs://abc/step2-load-data/step1_output_test_data'
train_artifact.properties["shape"].string_value = '(1000, 28, 28)'
test_artifact.properties["split"].string_value = 'test'
test_artifact.properties["description"].string_value = 'select * from table X where timestamp == 08022022'
test_artifact.type_id = store.get_artifact_type(type_name='DataSet').id
[test_artifact_id] = store.put_artifacts([test_artifact])

print(train_artifact)

7. Create an Execution to mark the beginning

1
2
3
4
5
6
7
# Register the Execution of a Trainer run
trainer_run = metadata_store_pb2.Execution()
trainer_run.type_id = trainer_type_id  # <--- from the ExecutionType()
trainer_run.properties["state"].string_value = "RUNNING"
[run_id] = store.put_executions([trainer_run])

print(trainer_run)

8. Record the input event, Execution <— Artifact (Dataset)

1
2
3
4
5
6
7
8
9
# Declare the input event
input_event = metadata_store_pb2.Event()
input_event.artifact_id = train_artifact_id
input_event.execution_id = run_id
input_event.type = metadata_store_pb2.Event.DECLARED_INPUT
# Submit input event to the Metadata Store
store.put_events([input_event])

print(input_event)

9. Train & save the model

1
2
3
history = model.fit(ds_train)
model_save_path = 'gs://abc/saved-model'
model.save(model_save_path)

10. Create an Artifact of trained and saved model

1
2
3
4
5
6
7
model_artifact = metadata_store_pb2.Artifact()
model_artifact.uri = model_save_path
model_artifact.properties["version"].string_value = '1.0.1'
model_artifact.properties["name"].string_value = 'model01'
model_artifact.properties["framework"].string_value = 'tf-2.9.1'
model_artifact.type_id = model_type_id
[model_artifact_id] = store.put_artifacts([model_artifact])

11. Record the output event, Execution —> Artifact ( saved model)

1
2
3
4
5
output_event = metadata_store_pb2.Event()
output_event.artifact_id = model_artifact_id
output_event.execution_id = run_id
output_event.type = metadata_store_pb2.Event.DECLARED_OUTPUT
store.put_events([output_event])

12. Create an Execution to mark the end

1
2
3
trainer_run.id = run_id
trainer_run.properties["state"].string_value = "COMPLETED"
store.put_executions([trainer_run])

13. Create an Artifact of metrics

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
metrics = history.history['sparse_categorical_accuracy']
print(metrics)

# now we save the Metric to GCS for persistency
from tensorflow.python.lib.io import file_io
filename = 'gs://abc/123.csv'
np.savetxt(file_io.FileIO(filename, 'w'), np.array(metrics), delimiter=",")

metric_artifact = metadata_store_pb2.Artifact()
metric_artifact.uri = filename
metric_artifact.properties["name"].string_value = 'sparse_categorical_accuracy'
metric_artifact.properties["value"].string_value = f'{metrics}'
metric_artifact.type_id = metric_type_id
[metric_artifact_id] = store.put_artifacts([metric_artifact])

14. Put them together into a Context (Experiment)

A context is alike an experiment which includes all the details about running a ML pipeline

1
2
3
4
5
6
7
8
# Group the model and the trainer run to an experiment.
my_experiment = metadata_store_pb2.Context()
my_experiment.type_id = experiment_type_id  # <--- declared before
my_experiment.name = "exp1"
my_experiment.properties["description"].string_value = "My 3rd experiment that utilize feature A "
my_experiment.properties["maintainer"].string_value = "someone@example.com"
my_experiment.properties["env"].string_value = "development"
[experiment_id] = store.put_contexts([my_experiment])

14.1 attribution: Artifact <—> Context(Experiment)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
attribution_model = metadata_store_pb2.Attribution()
attribution_model.artifact_id = model_artifact_id
attribution_model.context_id = experiment_id

attribution_train = metadata_store_pb2.Attribution()
attribution_train.artifact_id = train_artifact_id
attribution_train.context_id = experiment_id

attribution_test = metadata_store_pb2.Attribution()
attribution_test.artifact_id = test_artifact_id
attribution_test.context_id = experiment_id

attribution_metric = metadata_store_pb2.Attribution()
attribution_metric.artifact_id = metric_artifact_id
attribution_metric.context_id = experiment_id

14.2 association: Execution <—> Context(Experiment)

1
2
3
association = metadata_store_pb2.Association()
association.execution_id = run_id
association.context_id = experiment_id

14.3 add attribution and association to the Context

1
store.put_attributions_and_associations([attribution_model, attribution_train, attribution_test, attribution_metric], [association])

Now we have finished all the metadata recording

1. how do we get a specific experiment?

2. how to we reproduce that experiment?

Connect to MLMD store

1
2
3
4
5
6
7
import ml_metadata as mlmd
from ml_metadata.metadata_store import metadata_store
from ml_metadata.proto import metadata_store_pb2
connection_config = metadata_store_pb2.ConnectionConfig()
connection_config.sqlite.filename_uri = 'mlmd.sqlite'
connection_config.sqlite.connection_mode = 3
store = metadata_store.MetadataStore(connection_config)

we can iterate all experiments and find that experiment visually

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
#experiment_artifacts = store.get_artifacts_by_context(experiment_id)
#print(experiment_artifacts)
#store.get_artifacts_by_type('DataSet')

"""

 'get_contexts',
 'get_contexts_by_artifact',
 'get_contexts_by_execution',
 'get_contexts_by_id',
 'get_contexts_by_type',
 
"""

experiments = store.get_contexts()
for exp in experiments:
    print(exp.name)
    context = store.get_contexts_by_id([14])
    store.get_executions_by_context(experiment_id)
    artifacts = store.get_artifacts_by_context(experiment_id)
    for art in artifacts:
        print(art)
        print('#'*30)

get a specifc type of experiment

1
2
store.get_contexts_by_type(type_name='Experiment')
#print(experiment_id)
1
2
experiment_execution = store.get_executions_by_context(experiment_id)
print(experiment_execution)

get all artifacts from a specific experiment

1
2
3
store.get_artifacts(
    list_options = mlmd.ListOptions(
        filter_query=('contexts_a.type = "Experiment" AND contexts_a.name = "exp1"')))

How can we reproduce an experiment exactly?

Load the model and make a prediction

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
model_artifact = store.get_artifacts(
    list_options = mlmd.ListOptions(
        filter_query=('type = "SavedModel" AND contexts_a.type = "Experiment" AND contexts_a.name = "exp1"')))[0]
loaded_model = tf.keras.models.load_model(model_artifact.uri)
print(f'The realoded model: {loaded_model}')

# create an random input
test_data = np.random.random((1,28,28))
label = np.argmax(loaded_model.predict(test_data, verbose=0))
print(f'Predicted Label: {label}')