home_site

Lab05 - Apache Spark - uczenie maszynowe [ ver. ChO.2023.12.02.003 ]

Zawartość strony

Apache Spark - przetwarzanie grafów i ML

Tematyka zajęć:

  1. Analiza danych linii lotniczych w Stanach Zjednoczonych
  2. Wprowadzenie do zagadnień uczenia maszynowego ( ML)
  3. Prezentacja graficzna wyników analizy ML
  4. Analiza danych z wykorzystaniem drzewa decyzyjnego

A. Analiza lotów w Stanach Zjednoczonych z wykorzystaniem grafów (kontynuacja projektu z lab.04)

  1. Tworzymy nowy projekt i ustawiamy zmienne środowiska (kontynuacja projektu).
  2. import findspark
    findspark.init()
    
    from graphframes import *
    from pyspark.sql.types import *
    from pyspark.sql import functions as F
    
    import matplotlib
    import matplotlib.pyplot as plt
    plt.style.use('fivethirtyeight')
    
    from pyspark import SparkConf
    from pyspark import SparkContext
    sc = SparkContext.getOrCreate(SparkConf().setMaster("local[4]"))
    from pyspark.sql import SparkSession
    spark = SparkSession(sc)
    
  3. Wczytanie wartości opisy portów lotniczych - węzły.
    nodes = spark.read.csv("/home/spark/lab04/task4/airports.csv", header=False)
    
    nodes.columns
    
    cleaned_nodes = (nodes.select("_c1", "_c3", "_c4", "_c6", "_c7")
                     .filter("_c3 = 'United States'")
                     .withColumnRenamed("_c1", "name")
                     .withColumnRenamed("_c4", "id")
                     .withColumnRenamed("_c6", "latitude")
                     .withColumnRenamed("_c7", "longitude")
                     .drop("_c3"))
    
    cleaned_nodes = cleaned_nodes[cleaned_nodes["id"] != "\\N"]
    
  4. Wczytanie wartości relacji - krawędzie.
    relationships = spark.read.csv("/home/spark/lab04/task4/188591317_T_ONTIME.csv", header=True)
    
    cleaned_relationships = (relationships
                             .select("ORIGIN", "DEST", "FL_DATE", "DEP_DELAY", "ARR_DELAY",
                                     "DISTANCE", "TAIL_NUM", "FL_NUM", "CRS_DEP_TIME",
                                     "CRS_ARR_TIME", "UNIQUE_CARRIER")
                             .withColumnRenamed("ORIGIN", "src")
                             .withColumnRenamed("DEST", "dst")
                             .withColumnRenamed("DEP_DELAY", "deptDelay")
                             .withColumnRenamed("ARR_DELAY", "arrDelay")
                             .withColumnRenamed("TAIL_NUM", "tailNumber")
                             .withColumnRenamed("FL_NUM", "flightNumber")
                             .withColumnRenamed("FL_DATE", "date")
                             .withColumnRenamed("CRS_DEP_TIME", "time")
                             .withColumnRenamed("CRS_ARR_TIME", "arrivalTime")
                             .withColumnRenamed("DISTANCE", "distance")
                             .withColumnRenamed("UNIQUE_CARRIER", "airline")
                             .withColumn("deptDelay", F.col("deptDelay").cast(FloatType()))
                             .withColumn("arrDelay", F.col("arrDelay").cast(FloatType()))
                             .withColumn("time", F.col("time").cast(IntegerType()))
                             .withColumn("arrivalTime", F.col("arrivalTime").cast(IntegerType()))
                             )
    
  5. Utworzenie struktury GraphFrame.
    g = GraphFrame(cleaned_nodes, cleaned_relationships)
    
  6. Odczyt z pliku referencji do nazw linii lotniczych.
    airlines_reference = (spark.read.csv("/home/spark/lab04/task4/airlines.csv")
          .select("_c1", "_c3")
          .withColumnRenamed("_c1", "name")
          .withColumnRenamed("_c3", "code"))
    
    airlines_reference = airlines_reference[airlines_reference["code"] != "null"]
    
    df = spark.read.option("multiline", "true").json("/home/spark/lab04/task4/airlines.json")
    dummyDf = spark.createDataFrame([("test", "test")], ["code", "name"])
    
    for code in df.schema.fieldNames():
        tempDf = (df.withColumn("code", F.lit(code))
                  .withColumn("name", df[code]))
        tdf = tempDf.select("code", "name")
        dummyDf = dummyDf.union(tdf)
    
  7. Identyfikacja wszystkich linii lotniczych i ustalenie ile lotów przypada na każdą z nich.
     
    airlines = (g.edges
     .groupBy("airline")
     .agg(F.count("airline").alias("flights"))
     .sort("flights", ascending=False))
    
    full_name_airlines = (airlines_reference
                          .join(airlines, airlines.airline == airlines_reference.code)
                          .select("code", "name", "flights"))
    
    ax = (full_name_airlines.toPandas()
          .plot(kind='bar', x='name', y='flights', legend=None))
    
    ax.xaxis.set_label_text("")
    plt.tight_layout()
    plt.show()
    
  8. Realizacja funkcji, która korzysta z algorytmu "silnie połączonych komponentów" do znalezienia grup portów lotniczych dla każdej linii lotniczej, która posiada loty do i z na wszystkie lotniska znajdujące się w grupie. W następnym kroku tworzymy odpowiedni DataFrame z danymi dla każdej linii lotniczej.
    def find_scc_components(g, airline):
       # Create a subgraph containing only flights on the provided airline
       airline_relationships = g.edges[g.edges.airline == airline]
       airline_graph = GraphFrame(g.vertices, airline_relationships)
       # Calculate the Strongly Connected Components
       scc = airline_graph.stronglyConnectedComponents(maxIter=10)
       # Find the size of the biggest component and return that
       return (scc
         .groupBy("component")
         .agg(F.count("id").alias("size"))
         .sort("size", ascending=False)
         .take(1)[0]["size"])
    
     
    airline_scc = [(airline, find_scc_components(g, airline))
            for airline in airlines.toPandas()["airline"].tolist()]
    airline_scc_df = spark.createDataFrame(airline_scc, ['id', 'sccCount'])
    
    airline_reach = (airline_scc_df
      .join(full_name_airlines, full_name_airlines.code == airline_scc_df.id)
      .select("code", "name", "flights", "sccCount")
      .sort("sccCount", ascending=False))
    
     
    ax = (airline_reach.toPandas()
       .plot(kind='bar', x='name', y='sccCount', legend=None))
    ax.xaxis.set_label_text("")
    plt.tight_layout()
    plt.show()
    
    Lab5_spark_01
    Rys.5.1 Tabela zawierająca klastry lotnisk
  9. W ramach tego punktu wyznaczamy grupę lotnisk dla wybranego przewoźnika - Delta Airlines (DL).
    airline_relationships = g.edges.filter("airline = 'DL'")
    airline_graph = GraphFrame(g.vertices, airline_relationships)
    
     
    clusters = airline_graph.labelPropagation(maxIter=10)
    (clusters
      .sort("label")
      .groupby("label")
      .agg(F.collect_list("id").alias("airports"),
        F.count("id").alias("count"))
      .sort("count", ascending=False)
      .show(truncate=70, n=10))
    
  10. Prezentacja lotnisk dla klastra 1606317768706.
    from bokeh.sampledata import us_states
    from bokeh.plotting import *
    
     
    us_states = us_states.data.copy()
    
     
    del us_states["HI"]
    del us_states["AK"]
    # separate latitude and longitude points for the borders of the states.
    state_xs = [us_states[code]["lons"] for code in us_states]
    state_ys = [us_states[code]["lats"] for code in us_states]
    
     
    # init figure
    import csv
    p = figure(title="Prezentacja lotnisk dla klastra 1606317768706",
    toolbar_location="left", plot_width=1100, plot_height=700)
    # Draw state lines
    p.patches(state_xs, state_ys, fill_alpha=0.0, line_color="#884444", line_width=1.5)
    x = []
    y = []
    with open("/home/spark/lab04/task4/latlongs-cluster1.csv", "r") as latlongs_file:
      reader = csv.reader(latlongs_file, delimiter=",")
      for row in reader:
        x.append(float(row[3]))
        y.append(float(row[2]))
        # The scatter markers
    p.circle(x, y, size=6, color='navy', alpha=1)
    
     
    output_file("cluster1.html")
    # show results
    show(p)
    
    Lab5_spark_02
    Rys.5.2 Prezentacja lotnisk dla klastra 1606317768706

B. ML - analiza predykcyjna

Wprowadzenie

B1. Analiza kwiatów irysa

Do realizacji zadania wykorzytsamy zbiór danych "Iris flower" utworzony w 1936 r. przez Ronalda Fishera. Zbiór uczący opisuje kwiaty irysa przy pomocy 4 atrybutów: długości oraz szerokości płatków PETAL oraz SEPAL i przyporządkowuje każdemu z kwiatów jego faktyczny gatunek.

  1. Projekt zrealizajemy z środowisku python'a i odpowiednich bibliotek graficznych. Do realiacji zadania wykorzystamy biblioteki: matplotlib oraz sklearn. W ramach biblioteki sklearn dostępnych jest wiele danych przykładowych, w tym plik z danych dotyczącymi "Iris flower".
    from matplotlib import pyplot as plt
    from sklearn import datasets
    from sklearn.tree import DecisionTreeClassifier 
    from sklearn import tree
    
  2. Przygotownaie danych do dalszego przetwarzania.
    iris = datasets.load_iris()
    X = iris.data
    y = iris.target
    
  3. Do realizacji drzewa dezyzyjnego wykorzystamy algorytm zawarty w pakiecie "DecisionTreeClassifier" z parametrami opcjonalnymi.
    clf = DecisionTreeClassifier(random_state=1234)
    model = clf.fit(X, y)
    
  4. Prezentacja modelu w formie grafu - forma tekstowa.
    text_representation = tree.export_text(clf)
    print(text_representation)
    
  5. Prezentacja grafu w formie graficznej.
  6. Prezentacja modelu w formie grafu - forma tekstowa.
    with open("decistion_tree.log", "w") as fout:
        fout.write(text_representation)
    
    fig = plt.figure(figsize=(25,20))
    _ = tree.plot_tree(clf, 
                       feature_names=iris.feature_names,  
                       class_names=iris.target_names,
                       filled=True)
    
  7. Prezentacja grafu decyzji z wykorzystaniem bibliotek graphviz i dtreeviz.
  8. Graf z wykorzystaniem biblioteki graphviz.
    import graphviz
    dot_data = tree.export_graphviz(clf, out_file=None, 
                                    feature_names=iris.feature_names,  
                                    class_names=iris.target_names,
                                    filled=True)
    
    graph = graphviz.Source(dot_data, format="png") 
    
     
    graph
    
  9. Graf z wykorzystaniem biblioteki dtreeviz.
    from dtreeviz.trees import dtreeviz
    
    viz = dtreeviz(clf, X, y,
                    target_name="target",
                    feature_names=iris.feature_names,
                    class_names=list(iris.target_names))
    
    viz
    

B2. Prezentacja algorytmów realizujących analizę predykcyjną ( MLLib Spark )

  1. Prognozowanie jednej wartości na podstawie innych określane jest mianem regresji (http://pl.wikipedia.org/wiki/Regresja_(statystyka)). Techniki regresyjne powiązane są również z technikami klasyfikacyjnymi (http//pl.wikipedia.org/wiki/Klasyfikacja_statystyczna).
    Ogólnie termin „regresja” oznacza prognozowanie wymiernej wartości, na przykład wielkości, przychodów czy temperatury, natomiast klasyfikacja oznacza prognozowanie kategorii, na przykład „spam” czy „poprawna wiadomość”.
    Wspólną cechą regresji i klasyfikacji jest prognozowanie jednej (lub kilku) wartości na podstawie jednej (lub kilku) innych wartości. W obu przypadkach potrzebne są dane wejściowe i wyjściowe, aby poznać ich charakter. Muszą być znane pytania o dane i odpowiedzi na nie.
    Dlatego oba pojęcia określa się mianem uczenia nadzorowanego (http://pl.wikipedia.org/wiki/Uczenie_nadzorowane).
  2. Przeentacja niektórych algorytmów predykcyjnych dostępnych w pakietach MLlib Spark (również w pyspark).
  3. Przygotowanie środowiska do prezentacji działania algorytmów wraz z przykładowymi danymi.
    import findspark
    findspark.init()
    
    from pyspark import SparkConf
    from pyspark import SparkContext
    sc = SparkContext.getOrCreate(SparkConf().setMaster("local[4]"))
    
    from pyspark.mllib.regression import LabeledPoint
    
    data = [
         LabeledPoint(0.0, [1, 0, 0, 0]),
         LabeledPoint(1.0, [0, 1, 1, 1]),
         LabeledPoint(0.0, [2, 0, 0, 0]),
         LabeledPoint(1.0, [0, 2, 1, 3])
    ]
    
    rdd = sc.parallelize(data)
    features = [p.features.tolist() for p in data]
    
    type(features)
    
    print(features)
    
  4. Algorytm oparty o model - LogisticRegressionWithSGD (Binary Logistic Regression using Stochastic Gradient Descent)
    from pyspark.mllib.classification import LogisticRegressionWithSGD, SVMWithSGD, NaiveBayes
    lr_model = LogisticRegressionWithSGD.train(rdd)
    
    print('1 ->',lr_model.predict(features[0]))
    print('1 ->',lr_model.predict(features[1]))
    print('2 ->',lr_model.predict(features[2]))
    print('3 ->',lr_model.predict(features[3]))
    print('x ->',lr_model.predict([0.0,0.0,0.0,10.0]))
    
  5. Algorytm oparty o model - SVMWithSGD (Support Vector Machine (SVM) using Stochastic Gradient Descent)
    svm_model = SVMWithSGD.train(rdd)
    
    print('1 ->',svm_model.predict(features[0]))
    print('1 ->',svm_model.predict(features[1]))
    print('2 ->',svm_model.predict(features[2]))
    print('3 ->',svm_model.predict(features[3]))
    print('x ->',svm_model.predict([0.0,0.0,0.0,10.0]))
    
  6. Algorytm oparty o model - Naive Bayes Classification
    nb_model = NaiveBayes.train(rdd)
    
    print('1 ->',nb_model.predict(features[0]))
    print('1 ->',nb_model.predict(features[1]))
    print('2 ->',nb_model.predict(features[2]))
    print('3 ->',nb_model.predict(features[3]))
    print('x ->',nb_model.predict([0.0,0.0,0.0,10.0]))
    
  7. Algorytm oparty o model - DecisionTree
    from pyspark.mllib.tree import DecisionTree, RandomForest, GradientBoostedTrees
    
    categoricalFeaturesInfo = {0: 3}
    
    dt_model = DecisionTree.trainClassifier(
                rdd, numClasses=2, categoricalFeaturesInfo=categoricalFeaturesInfo)
    
    print('1 ->',dt_model.predict(features[0]))
    print('1 ->',dt_model.predict(features[1]))
    print('2 ->',dt_model.predict(features[2]))
    print('3 ->',dt_model.predict(features[3]))
    print('x ->',dt_model.predict([0.0,0.0,0.0,10.0]))
    

B3. Drzewo decyzyjne - analiza gry w golfa w zależności od pogody.

  1. Otwieramy nowy projekt i ustawiamy parametry środowiska.
    import findspark
    findspark.init()
    
  2. Przygotowanie bibliotek do realizacji zadania w ramach funkcjonalności bibliotek opracowanych dla języka python wyznaczających model drzewa decyzyjnego.
    from pyspark import SparkConf
    from pyspark import SparkContext
    sc = SparkContext.getOrCreate(SparkConf().setMaster("local[*]"))
    
    from pyspark.mllib.regression import LabeledPoint
    
  3. Przygotowanie danych do realizacji zadania.
    outlook = {'sunny': 0.0, 'overcast': 1.0, 'rainy': 2.0}
    labeledpoints = [
      LabeledPoint(0.0,[outlook['sunny'],85,85,False]),
      LabeledPoint(0.0,[outlook['sunny'],80,90,True]), 
      LabeledPoint(1.0,[outlook['overcast'],83,86,False]),
      LabeledPoint(1.0,[outlook['rainy'],70,96,False]),
      LabeledPoint(1.0,[outlook['rainy'],68,80,False]),
      LabeledPoint(0.0,[outlook['rainy'],65,70,True]),
      LabeledPoint(1.0,[outlook['overcast'],64,65,True]),
      LabeledPoint(0.0,[outlook['sunny'],72,95,False]),
      LabeledPoint(1.0,[outlook['sunny'],69,70,False]),
      LabeledPoint(1.0,[outlook['sunny'],75,80,False]),
      LabeledPoint(1.0,[outlook['sunny'],75,70,True]),
      LabeledPoint(1.0,[outlook['overcast'],72,90,True]),
      LabeledPoint(1.0,[outlook['overcast'],81,75,False]),
      LabeledPoint(0.0,[outlook['rainy'],71,91,True])
      ]
    
    data = sc.parallelize(labeledpoints).cache()
    
  4. Wyznaczenie modelu i predykcja wartości oczekiwanej z wykorzystaniem drzewa decyzyjnego (MLLib).
    from pyspark.mllib.tree import DecisionTree
    
    model = DecisionTree.trainClassifier(data=data,
       numClasses=2,
       categoricalFeaturesInfo={0: 3})
    
    print(model.toDebugString())
    
     
    model.predict([outlook["overcast"],85,85,True])
    
  5. Wyznaczenie modelu i predykcja z wykorzystaniem Naive Bayes Classifier (MLLib)
    from pyspark.mllib.classification import NaiveBayes, NaiveBayesModel
    
    model = NaiveBayes.train(data=data, lambda_=1.0)
    
     
    model.predict([1.0,85,85,True])
    
  6. Przygotowanie bibliotek do realizacji modelu drzewa decyzyjnego z wykorzystaniem funkcjonalności w programie Spark.
    from pyspark.ml.linalg import DenseVector
    from pyspark.ml.classification import DecisionTreeClassifier
    from pyspark.ml.evaluation import MulticlassClassificationEvaluator
    
  7. Przygotowanie zestawu danych.
    from pyspark.sql import Row # Prepare DataFrame of labeled observations
    outlook = {"sunny": 0.0, "overcast": 1.0, "rainy": 2.0}
    observations = [
    Row(label=0, features=DenseVector([outlook["sunny"],85,85,False])),
    Row(label=0, features=DenseVector([outlook["sunny"],80,90,True])),
    Row(label=1, features=DenseVector([outlook["overcast"],83,86,False])),
    Row(label=1, features=DenseVector([outlook["rainy"],70,96,False])),
    Row(label=1, features=DenseVector([outlook["rainy"],68,80,False])),
    Row(label=0, features=DenseVector([outlook["rainy"],65,70,True])),
    Row(label=1, features=DenseVector([outlook["overcast"],64,65,True])),
    Row(label=0, features=DenseVector([outlook["sunny"],72,95,False])),
    Row(label=1, features=DenseVector([outlook["sunny"],69,70,False])),
    Row(label=1, features=DenseVector([outlook["sunny"],75,80,False])),
    Row(label=1, features=DenseVector([outlook["sunny"],75,70,True])),
    Row(label=1, features=DenseVector([outlook["overcast"],72,90,True])),
    Row(label=1, features=DenseVector([outlook["overcast"],81,75,False])),
    Row(label=0, features=DenseVector([outlook["rainy"],71,91,True]))
    ]
    
  8. Przygotawanie danych w strukturze DataFrame.
    rdd = sc.parallelize(observations)
    
    from pyspark.sql import SparkSession  
    spark = SparkSession \
            .builder \
            .appName("Python Spark Data Exploration") \
            .config("spark.some.config.option", "some-value") \
            .getOrCreate()
    
     data = spark.createDataFrame(rdd)
    
  9. Wyznaczenie zbioru uczącego i testowego ( 30% danych w zbiorze testowym).
    (trainingData, testData) = data.randomSplit([0.7, 0.3])
    
    trainingData.show()
    
    testData.show()
    
  10. Wyznaczenie modelu drzewa decyzyjnego.
    dt = DecisionTreeClassifier()
    
    model = dt.fit(trainingData)
    
    predictions = model.transform(testData)
    
    predictions.show()
    
  11. Sparwdzenie poprawności drzewa decyzyjnego z wykorzystaniem danych testowych.
    evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
    
    accuracy = evaluator.evaluate(predictions)
    
    predictions = model.transform(testData)
    
    print("Test Error = %g " % (1.0 - accuracy))
    

B4. Decision Tree Regression Model in Machine Learning

  1. W ramach zadania skorzystamy z zestawu danych mieszkaniowych dla miasta Boston pochodzących z informacji zebranych przez U.S. Census Service a dotyczących mieszkalnictwa w rejonie Boston MA. Plik jest dostepny w serwisie https://www.kaggle.com/.
    Opis kolumn w pliku:
  2. Otwieramy nowy projekt w ramach Jupyter'a.
                                                                 
    import numpy as np
    import pandas as pd
    import matplotlib.pyplot as plt
    
  3. pobranie danych z pliku i dodanie wiersza nagłówkowego.
    column_names = ['CRIM', 'ZN', 'INDUS', 'CHAS', 'NOX', 'RM', 'AGE', 'DIS', 'RAD', 'TAX', 'PTRATIO', 'B', 'LSTAT', 'MEDV']
    
    housing = pd.read_csv('/home/spark/lab05/housing.csv',header=None, delimiter=r"\s+", names=column_names)
    
    housing.head()
    
  4. Prezentacja danych na wykresie punktowym (wybieramy pola RM i MEDV).
    plt.scatter(x=housing['RM'],y=housing['MEDV'],color='red')
    plt.xlabel('average number of rooms per dwelling')
    plt.ylabel('Median value of owner-occupied homes in $1000s')
    
  5. Przegotowanie danych uczących i testowych (20% danych testowych).
    x=pd.DataFrame(housing['RM'])
    y=pd.DataFrame(housing['MEDV'])
    
    from sklearn.model_selection import train_test_split
    x_train, x_test, y_train, y_test = train_test_split(x, y, test_size=0.20)
    
  6. Tworzenie modelu drzewa decyzyjnego.
    from sklearn.tree import DecisionTreeRegressor
    
    regressor = DecisionTreeRegressor(criterion='mse',random_state=100,max_depth=4, min_samples_leaf=1)
    
    regressor.fit(x_train,y_train)
    
  7. Prezentacja modelu w formie graficznej w serwisie "http://www.webgraphviz.com/". Generujemy plik reg_tree.dot, który wczytamy w interaktywnym panelu serwisu. Na rys. 5.3 prezentacja uzysaknego modelu drzewa decyzyjnego. Realizacja zadania wymaga zainstalowania bilioteki pydotplus (pip install pydotplus).
    from sklearn.tree import export_graphviz
    import pydotplus
    
    export_graphviz (regressor, out_file='reg_tree.dot')
    
    Lab5_spark_03
    Rys.5.3 Graf drzewa decyzyjnego
  8. Predykcja wartości z wykorzystanie zbioru testowego i porównanie z wartościami oryginalnymi.
    y_pred = regressor.predict(x_test)
    
    print(y_pred[4:9])
    print(y_test[4:9])
    
    from sklearn.metrics import mean_squared_error
    mse = mean_squared_error(y_pred,y_test)
    rmse = np.sqrt(mse)
    rmse