MLMD is a library to track the full lineage of your entire ML workflow. Full lineage is all the steps from data ingestion, data preprocessing, validation, training, evaluation, deployment, and so on.
1
2
3
|
import numpy as np
import tensorflow as tf
print(tf.__version__)
|
1
2
3
4
5
6
7
8
9
10
|
import ml_metadata as mlmd
from ml_metadata.metadata_store import metadata_store
from ml_metadata.proto import metadata_store_pb2
connection_config = metadata_store_pb2.ConnectionConfig()
connection_config.sqlite.filename_uri = 'mlmd.sqlite'
connection_config.sqlite.connection_mode = 3 # READWRITE_OPENCREATE
store = metadata_store.MetadataStore(connection_config)
print(mlmd.__version__)
print(store)
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
|
# Create ArtifactTypes for Input Training
data_type = metadata_store_pb2.ArtifactType()
data_type.name = "DataSet"
data_type.properties["shape"] = metadata_store_pb2.STRING
data_type.properties["description"] = metadata_store_pb2.STRING
data_type.properties["split"] = metadata_store_pb2.STRING
data_type_id = store.put_artifact_type(data_type)
# Create ArtifactTypes for Output Model
model_type = metadata_store_pb2.ArtifactType()
model_type.name = "SavedModel"
model_type.properties["version"] = metadata_store_pb2.STRING
model_type.properties["name"] = metadata_store_pb2.STRING
model_type.properties["framework"] = metadata_store_pb2.STRING
model_type_id = store.put_artifact_type(model_type)
# Create ArtifactTypes for Metrics
metric_type = metadata_store_pb2.ArtifactType()
metric_type.name = "Metrics"
metric_type.properties["name"] = metadata_store_pb2.STRING
metric_type.properties["value"] = metadata_store_pb2.STRING
metric_type_id = store.put_artifact_type(metric_type)
# Execution is like running a KFP component
trainer_type = metadata_store_pb2.ExecutionType()
trainer_type.name = "Trainer"
trainer_type.properties["state"] = metadata_store_pb2.STRING
trainer_type_id = store.put_execution_type(trainer_type)
# Context is alike an Experiment
experiment_type = metadata_store_pb2.ContextType()
experiment_type.name = "Experiment"
experiment_type.properties["description"] = metadata_store_pb2.STRING
experiment_type.properties["maintainer"] = metadata_store_pb2.STRING
experiment_type.properties["env"] = metadata_store_pb2.STRING
experiment_type_id = store.put_context_type(experiment_type)
|
1
2
3
4
5
6
7
8
9
10
11
|
model = tf.keras.models.Sequential([
tf.keras.layers.Flatten(input_shape=(28, 28)),
tf.keras.layers.Dense(128, activation='relu'),
tf.keras.layers.Dense(10)
])
model.compile(
optimizer=tf.keras.optimizers.Adam(0.001),
loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
metrics=[tf.keras.metrics.SparseCategoricalAccuracy()], # metrics=['sparse_categorical_accuracy']
)
print(model)
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
SHUFFLE_BUFFER_SIZE = 100
BATCH_SIZE = 64
# 1. download the raw dataset
path = tf.keras.utils.get_file('mnist.npz', 'https://storage.googleapis.com/tensorflow/tf-keras-datasets/mnist.npz')
with np.load(path) as data:
x_train = data['x_train']
y_train = data['y_train']
x_test = data['x_test']
y_test = data['y_test']
# 2. create a dataset for train
ds_train = tf.data.Dataset.from_tensor_slices((x_train, y_train))
ds_train = ds_train.shuffle(SHUFFLE_BUFFER_SIZE).batch(BATCH_SIZE)
# 3. create a dataset for test
ds_test = tf.data.Dataset.from_tensor_slices((x_test, y_test))
print(ds_train)
|
Here we created two records for “train” and “test” respectively.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
train_artifact = metadata_store_pb2.Artifact()
train_artifact.uri = 'gs://abc/step1_output_train_data'
train_artifact.properties["shape"].string_value = '(5000, 28, 28)'
train_artifact.properties["split"].string_value = 'train'
train_artifact.properties["description"].string_value = 'this is the training data randomly sampled from default MNIST data with seed=13'
train_artifact.type_id = store.get_artifact_type(type_name="DataSet").id
[train_artifact_id] = store.put_artifacts([train_artifact])
test_artifact = metadata_store_pb2.Artifact()
test_artifact.uri = 'gs://abc/step2-load-data/step1_output_test_data'
train_artifact.properties["shape"].string_value = '(1000, 28, 28)'
test_artifact.properties["split"].string_value = 'test'
test_artifact.properties["description"].string_value = 'select * from table X where timestamp == 08022022'
test_artifact.type_id = store.get_artifact_type(type_name='DataSet').id
[test_artifact_id] = store.put_artifacts([test_artifact])
print(train_artifact)
|
1
2
3
4
5
6
7
|
# Register the Execution of a Trainer run
trainer_run = metadata_store_pb2.Execution()
trainer_run.type_id = trainer_type_id # <--- from the ExecutionType()
trainer_run.properties["state"].string_value = "RUNNING"
[run_id] = store.put_executions([trainer_run])
print(trainer_run)
|
1
2
3
4
5
6
7
8
9
|
# Declare the input event
input_event = metadata_store_pb2.Event()
input_event.artifact_id = train_artifact_id
input_event.execution_id = run_id
input_event.type = metadata_store_pb2.Event.DECLARED_INPUT
# Submit input event to the Metadata Store
store.put_events([input_event])
print(input_event)
|
1
2
3
|
history = model.fit(ds_train)
model_save_path = 'gs://abc/saved-model'
model.save(model_save_path)
|
10. Create an Artifact of trained and saved model
1
2
3
4
5
6
7
|
model_artifact = metadata_store_pb2.Artifact()
model_artifact.uri = model_save_path
model_artifact.properties["version"].string_value = '1.0.1'
model_artifact.properties["name"].string_value = 'model01'
model_artifact.properties["framework"].string_value = 'tf-2.9.1'
model_artifact.type_id = model_type_id
[model_artifact_id] = store.put_artifacts([model_artifact])
|
1
2
3
4
5
|
output_event = metadata_store_pb2.Event()
output_event.artifact_id = model_artifact_id
output_event.execution_id = run_id
output_event.type = metadata_store_pb2.Event.DECLARED_OUTPUT
store.put_events([output_event])
|
1
2
3
|
trainer_run.id = run_id
trainer_run.properties["state"].string_value = "COMPLETED"
store.put_executions([trainer_run])
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
metrics = history.history['sparse_categorical_accuracy']
print(metrics)
# now we save the Metric to GCS for persistency
from tensorflow.python.lib.io import file_io
filename = 'gs://abc/123.csv'
np.savetxt(file_io.FileIO(filename, 'w'), np.array(metrics), delimiter=",")
metric_artifact = metadata_store_pb2.Artifact()
metric_artifact.uri = filename
metric_artifact.properties["name"].string_value = 'sparse_categorical_accuracy'
metric_artifact.properties["value"].string_value = f'{metrics}'
metric_artifact.type_id = metric_type_id
[metric_artifact_id] = store.put_artifacts([metric_artifact])
|
1
2
3
4
5
6
7
8
|
# Group the model and the trainer run to an experiment.
my_experiment = metadata_store_pb2.Context()
my_experiment.type_id = experiment_type_id # <--- declared before
my_experiment.name = "exp1"
my_experiment.properties["description"].string_value = "My 3rd experiment that utilize feature A "
my_experiment.properties["maintainer"].string_value = "someone@example.com"
my_experiment.properties["env"].string_value = "development"
[experiment_id] = store.put_contexts([my_experiment])
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
attribution_model = metadata_store_pb2.Attribution()
attribution_model.artifact_id = model_artifact_id
attribution_model.context_id = experiment_id
attribution_train = metadata_store_pb2.Attribution()
attribution_train.artifact_id = train_artifact_id
attribution_train.context_id = experiment_id
attribution_test = metadata_store_pb2.Attribution()
attribution_test.artifact_id = test_artifact_id
attribution_test.context_id = experiment_id
attribution_metric = metadata_store_pb2.Attribution()
attribution_metric.artifact_id = metric_artifact_id
attribution_metric.context_id = experiment_id
|
1
2
3
|
association = metadata_store_pb2.Association()
association.execution_id = run_id
association.context_id = experiment_id
|
14.3 add attribution and association to the Context
1
|
store.put_attributions_and_associations([attribution_model, attribution_train, attribution_test, attribution_metric], [association])
|
1
2
3
4
5
6
7
|
import ml_metadata as mlmd
from ml_metadata.metadata_store import metadata_store
from ml_metadata.proto import metadata_store_pb2
connection_config = metadata_store_pb2.ConnectionConfig()
connection_config.sqlite.filename_uri = 'mlmd.sqlite'
connection_config.sqlite.connection_mode = 3
store = metadata_store.MetadataStore(connection_config)
|
we can iterate all experiments and find that experiment visually
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
#experiment_artifacts = store.get_artifacts_by_context(experiment_id)
#print(experiment_artifacts)
#store.get_artifacts_by_type('DataSet')
"""
'get_contexts',
'get_contexts_by_artifact',
'get_contexts_by_execution',
'get_contexts_by_id',
'get_contexts_by_type',
"""
experiments = store.get_contexts()
for exp in experiments:
print(exp.name)
context = store.get_contexts_by_id([14])
store.get_executions_by_context(experiment_id)
artifacts = store.get_artifacts_by_context(experiment_id)
for art in artifacts:
print(art)
print('#'*30)
|
1
2
|
store.get_contexts_by_type(type_name='Experiment')
#print(experiment_id)
|
1
2
|
experiment_execution = store.get_executions_by_context(experiment_id)
print(experiment_execution)
|
1
2
3
|
store.get_artifacts(
list_options = mlmd.ListOptions(
filter_query=('contexts_a.type = "Experiment" AND contexts_a.name = "exp1"')))
|
Load the model and make a prediction
1
2
3
4
5
6
7
8
9
10
|
model_artifact = store.get_artifacts(
list_options = mlmd.ListOptions(
filter_query=('type = "SavedModel" AND contexts_a.type = "Experiment" AND contexts_a.name = "exp1"')))[0]
loaded_model = tf.keras.models.load_model(model_artifact.uri)
print(f'The realoded model: {loaded_model}')
# create an random input
test_data = np.random.random((1,28,28))
label = np.argmax(loaded_model.predict(test_data, verbose=0))
print(f'Predicted Label: {label}')
|