!pip install zenml -q
!pip install transformers datasets -q
!pip install mlflow
!git init
!zenml init
Initializing ZenML repository at /content.
Registered stack component with name 'local_orchestrator'.
Registered stack component with name 'local_metadata_store'.
Registered stack component with name 'local_artifact_store'.
Registered stack with name 'local_stack'.
ZenML repository initialized at /content.
ls -la
import mlflow
from zenml.integrations.mlflow.mlflow_step_decorator import enable_mlflow
from zenml.pipelines import Schedule, pipeline
from zenml.steps import BaseStepConfig, Output, step
from datasets import load_dataset, load_metric
from datasets import Dataset
from datasets.dataset_dict import DatasetDict
import os
from typing import Any, Type
from zenml.artifacts import DataArtifact, SchemaArtifact, StatisticsArtifact, ModelArtifact
from zenml.materializers.base_materializer import BaseMaterializer
from transformers import AutoTokenizer
from transformers import TFAutoModelForTokenClassification, TFDistilBertForTokenClassification
from transformers import create_optimizer
from transformers import DataCollatorForTokenClassification
DEFAULT_FILENAME = "dataset.parquet.gzip"
DEFAULT_DICT_FILENAME = "dict_datasets"

class DatasetMaterializer(BaseMaterializer):
    """Materializer to read data to and from pandas."""

    ASSOCIATED_TYPES = (Dataset, DatasetDict)

    def handle_input(self, data_type: Type[Any]) -> Dataset:
        """Reads Dataset from a parquet file."""
        if issubclass(data_type, Dataset):
          return Dataset.from_parquet(
              os.path.join(self.artifact.uri, DEFAULT_FILENAME))
        elif issubclass(data_type, DatasetDict):
          return DatasetDict.load_from_disk(
              os.path.join(self.artifact.uri, DEFAULT_DICT_FILENAME)

    def handle_return(self, ds: Type[Any]) -> None:
        """Writes a Dataset to the specified filename.
            Dataset: The Dataset to write.
        if isinstance(ds, Dataset):
          filepath = os.path.join(self.artifact.uri, DEFAULT_FILENAME)
          ds.to_parquet(filepath, compression=COMPRESSION_TYPE)
        elif isinstance(ds, DatasetDict):
          filepath = os.path.join(self.artifact.uri, DEFAULT_DICT_FILENAME)
DEFAULT_MODEL_DIR = "hf_model"

class HFModelMaterializer(BaseMaterializer):
    """Materializer to read data to and from pandas."""

    ASSOCIATED_TYPES = (TFDistilBertForTokenClassification, )

    def handle_input(self, data_type: Type[Any]) -> Dataset:
        """Reads Dataset from a parquet file."""

        return TFAutoModelForTokenClassification.from_pretrained(
            os.path.join(self.artifact.uri, DEFAULT_MODEL_DIR)

    def handle_return(self, model: Type[Any]) -> None:
        """Writes a Dataset to the specified filename.
            Dataset: The Dataset to write.
        model.save_pretrained(os.path.join(self.artifact.uri, DEFAULT_MODEL_DIR))
class TokenClassificationConfig(BaseStepConfig):
  task = "ner"  # Should be one of "ner", "pos" or "chunk"
  model_checkpoint = "distilbert-base-uncased"
  batch_size = 16
  dataset_name = "conll2003"
  label_all_tokens = True
  num_train_epochs = 3
def data_importer(
    config: TokenClassificationConfig,
) -> DatasetDict:
  datasets = load_dataset(config.dataset_name)
  print("Sample Example :", datasets["train"][0])
  #label_list = datasets["train"].features[f"{config.task}_tags"].feature.names
  return datasets
def tokenization(
    config: TokenClassificationConfig,
    datasets: DatasetDict,
) -> DatasetDict:

  def tokenize_and_align_labels(examples):
    tokenized_inputs = tokenizer(
        examples["tokens"], truncation=True, is_split_into_words=True

    labels = []
    for i, label in enumerate(examples[f"{config.task}_tags"]):
        word_ids = tokenized_inputs.word_ids(batch_index=i)
        previous_word_idx = None
        label_ids = []
        for word_idx in word_ids:
            # Special tokens have a word id that is None. We set the label to -100 so they are automatically
            # ignored in the loss function.
            if word_idx is None:
            # We set the label for the first token of each word.
            elif word_idx != previous_word_idx:
            # For the other tokens in a word, we set the label to either the current label or -100, depending on
            # the label_all_tokens flag.
                label_ids.append(label[word_idx] if config.label_all_tokens else -100)
            previous_word_idx = word_idx


    tokenized_inputs["labels"] = labels
    return tokenized_inputs
  tokenizer = AutoTokenizer.from_pretrained(config.model_checkpoint)
  tokenized_datasets =, batched=True)
  return tokenized_datasets
def trainer(
    config: TokenClassificationConfig,
    tokenized_datasets: DatasetDict,
) -> TFDistilBertForTokenClassification:
  label_list = tokenized_datasets["train"].features[f"{config.task}_tags"].feature.names

  model = TFAutoModelForTokenClassification.from_pretrained(
    config.model_checkpoint, num_labels=len(label_list)

  num_train_steps = (len(tokenized_datasets["train"]) // config.batch_size) * config.num_train_epochs
  optimizer, lr_schedule = create_optimizer(


  tokenizer = AutoTokenizer.from_pretrained(config.model_checkpoint)
  data_collator = DataCollatorForTokenClassification(tokenizer, return_tensors="tf")

  train_set = tokenized_datasets["train"].to_tf_dataset(
    columns=["attention_mask", "input_ids", "labels"],
  validation_set = tokenized_datasets["validation"].to_tf_dataset(
      columns=["attention_mask", "input_ids", "labels"],

  return model
import tensorflow as tf

def evaluator(
    config: TokenClassificationConfig,
    model: TFDistilBertForTokenClassification,
    tokenized_datasets: DatasetDict,
) -> float:
  tokenizer = AutoTokenizer.from_pretrained(config.model_checkpoint)
  data_collator = DataCollatorForTokenClassification(tokenizer, return_tensors="tf")
  validation_set = tokenized_datasets["validation"].to_tf_dataset(
      columns=["attention_mask", "input_ids", "labels"],
  test_loss = model.evaluate(validation_set, verbose=1)
  mlflow.log_metric("val_loss", test_loss)
  return test_loss
def train_pipeline(importer, tokenizer, trainer):
  datasets = importer()
  tokenized_datasets = tokenizer(datasets=datasets)
  model = trainer(tokenized_datasets)
#                       tokenizer=tokenization().with_return_materializers(DatasetMaterializer),
#               trainer=trainer().with_return_materializers(HFModelMaterializer)).run()
def train_eval_pipeline(importer, tokenizer, trainer, evaluator):
  datasets = importer()
  tokenized_datasets = tokenizer(datasets=datasets)
  model = trainer(tokenized_datasets)
  evaluator(model=model, tokenized_datasets=tokenized_datasets)
Step `trainer` has started.

#                       tokenizer=tokenization().with_return_materializers(DatasetMaterializer),
#               trainer=trainer()).run()
Install localtunnel

!zenml integration install dash -f

from zenml.integrations.dash.visualizers.pipeline_run_lineage_visualizer import (
from zenml.repository import Repository

repo = Repository()
latest_run = repo.get_pipelines()[-1].runs[-1]
!python & lt --port 8050
MlFlow Tracking

from zenml.environment import Environment
from zenml.integrations.mlflow.mlflow_environment import MLFLOW_ENVIRONMENT_NAME
mlflow_env = Environment()[MLFLOW_ENVIRONMENT_NAME]
<zenml.integrations.mlflow.mlflow_environment.MLFlowEnvironment at 0x7f487ad5fe10>
!mlflow ui --backend-store-uri {mlflow_env.tracking_uri} & lt --port 5000
your url is:
Dag Visualization

!zenml integration install graphviz -f
from zenml.repository import Repository
from zenml.integrations.graphviz.visualizers.pipeline_run_dag_visualizer import (
repo = Repository()
pipe = repo.get_pipelines()[-1]
latest_run = pipe.runs[-1]
PipelineRunView(id=2, name='train_eval_pipeline-20_Feb_22-09_58_38_115601')
This integration is not completed yet. Results might be unexpected.
