Spark ve Hive ile Basit bir ETL Uygulaması

Herkese Selam,

Bu yazıda HDFS (Hadoop File System) üzerinde Spark ile örnek bir veri setini okuyup  basit bir analitik operasyon yapıp daha sonra Hive’da yaratacağım bir tabloya yazacağım.

Günümüzde üretilen veri miktarının çok yüksek ölçekte olması ve toplanan verinin analiz edilme ihtiyacının her geçen gün artması sonucunda büyük veri teknolojileri hayatımıza hızla girdi. Artık neredeyse bir çok firmanın büyük veri ortamları bulunmakta ve bu ortamlarda büyük veri teknolojilerini kullanarak analizler yapmaktalar.

Endüstride çok bilinen ve kullanımı en yaygın olaran büyük veri ortamları Hadoop’tur. Hadoop ekosisteminde, verinin saklanmasından verinin işlenmesine, verinin  görselleştirilmesinden sorgulanmasına kadar bir çok çözüm bulunmaktadır.  Hadoop ekosisteminde bulunan her bir çözüm ve kütüphane ayrı uzmanlıklar gerektiren derinlikte teknolojilerdir. Bu ekosistemin barındırdığı farklı teknolojileri ve kullanım alanlarını aşağıdaki resimden inceleyebilirsiniz.

Şimdi örnek uygulamanın yapımına geçelim. Yapacağımız örnekte veriyi önce linux file sistemimizden, Hadoop ekosisteminin veri saklama ünitesi olan HDFS’e göndereceğiz (Bu adıma Extraction diyebiliriz.). Daha sonra buraya yazdığımız veriyi Spark ile önce okuyacağız ve daha sonra basit bir transformasyon (Transformation) uygulayıp Hive’a yazacağız (Load). Hive, hadoop ekosisteminde var olan ve bu ortamda saklanan verileri sorgulayabilmemizi sağlayan bir alt yapı. Biz bu alt yapı ile büyük veri ortamımızdaki verileri SQL dili kullanarak kolayca sorgulayabiliyoruz.

Öncelikle örnek çalışacağımız veri setini tanıyalım. Çalışacağımız veri seti Kaggle üzerindenki “Sample Sales Data” veri seti. Aşağıdaki tabloda kolon adlarını ve tiplerini görebilirsiniz.

Column Name Data Type
ORDERNUMBER Number
QUANTITYORDERED Number
PRICEEACH Number
ORDERLINENUMBER Number
SALES Number
ORDERDATE Date
STATUS String
QTR_ID Number
MONTH_ID Number
YEAR_ID Number
PRODUCTLINE String
MSRP Number
PRODUCTCODE String
CUSTOMERNAME String
PHONE Number
ADDRESSLINE1 String
ADDRESSLINE2 String
CITY String
STATE String
POSTALCODE Number
COUNTRY String
TERRITORY String
CONTACTLASTNAME String
CONTACTFIRSTNAME String
DEALSIZE String

Bu veriseti, örnek satış işlemleri transactionlarını içermektedir. Biz bu örnek satış datasından bazı transformationlar yapıp örnek bir rapor datası oluşturup bunu bir Hive tablosuna insert edeceğiz.

İlk olarak Kaggle’dan download ettiğimiz örnek veriyi HDFS’e kopyalayalım. Bunun için hdfs’te öncelikle bir dizin oluşturalım.

hadoop fs -mkdir samplesales

Evet, hdfs’te dizini yarattık şimdi ise lokal dizinimiz de duran örnek veriyi hdfs’e kopyalayalım.

hadoop fs -copyFromLocal sales_sample_data.csv samplesales

hadoop fs -ls samplesales/

Veriyi hdfs’e yazdık şimdi ise Pyspark arayüzümüzü başlatıp veriyi spark ile işlemeye başlayalım.

/user/spark-2.1.0-bin-hadoop2.7/bin/pyspark --master yarn-client --num-executors 4 --driver-memory 2g --executor-memory 4g

Spark başarılı bir şekilde başladı. Bu aşamada kullanacağımız kütüphaneleri import edelim.

from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext
from pyspark.sql import Row
from pyspark.sql import HiveContext
from pyspark.sql.functions import *

hive_context = HiveContext(sc)
sqlContext = SQLContext(sc)

Şimdi ise örnek versetimizi tutacağımız satır formatını oluşturalım.

RecSales = RecSales = Row('ordernumber','quantityordered','priceeach','orderlinenumber','sales','orderdate','status','qtr_id','month_id','year_id','productline','msrp','productcode','customername','phone','addressline1','addressline2','city','state','postalcode','country','territory','contactlastname','contactfirstname','dealsize')

Sıra hdfs’e aldığımız verimizi okumaya ve bir dataframe içerisine yazmaya geldi. Veri içerisinde kolonlar “,” ayracı ile ayrildığı için bunu ayracı veriyi yüklerken belirtiyoruz. Veriyi data frame içine aldıktan sonra bir isim verip temp table olarak işaretliyoruz. Bu isimi daha sonra hive_context veya sqlContext içerisinde SQL yazarken kullanabileceğiz.

dataSales = sc.textFile("/user/samplesales/")
header = dataSales.first()
dataSales= dataSales.filter(lambda line: line != header)
recSales = dataSales.map(lambda l: l.split(","))
dataSales = recSales.map(lambda l: RecSales(l[0],l[1],l[2],l[3],l[4],l[5],l[6],l[7],l[8],l[9],l[10],l[11],l[12],l[13],l[14],l[15],l[16],l[17],l[18],l[19],l[20],l[21],l[22],l[23],l[24]))
dfRecSales = hive_context.createDataFrame(dataSales)
dfRecSales.registerTempTable("sales")

Veriyi başarılı bir şekilde hdfs’ten okuduk ve bir data frame objesi içerisine yazdık. Şimdi Spark SQL ile yüklediğimiz veriye basit bir kaç sorgu atalım.

hive_context.sql("select count(*) from sales").collect()

hive_context.sql("select * from sales").head()

hive_context.sql("select ordernumber,priceeach  from sales").head(2)

Evet şimdi satışları territory (bölge) alanına göre gruplayalım ve sonuçları bir Hive tablosuna yazalım.

dfterriroty = hive_context.sql("select territory,sum(priceeach) total from sales group by territory")
dfterriroty.registerTempTable("sumterr")
dfterriroty.show()

Elde ettiğimiz sonucu bir hive tablosu yaratıp içerisine yazalım.

hive_context.sql("CREATE TABLE IF NOT EXISTS territory_sum (territory String, total INT)")

hive_context.sql("insert into territory_sum select * from sumterr")

Son olarak Hive’a yazılan datayı kontrol edelim.

hive_context.sql("select * from territory_sum").show()

hive_context.sql("select count(*) from territory_sum").show()

hive_context.sql("select count(*) from sumterr").show()

Advertisements

About ... from Emrah METE

Bilgisayar Mühendisi
This entry was posted in Uncategorized and tagged , , , , , , , , , , , . Bookmark the permalink.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google photo

You are commenting using your Google account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s

This site uses Akismet to reduce spam. Learn how your comment data is processed.