메뉴 건너뛰기

Bigdata, Semantic IoT, Hadoop, NoSQL

Bigdata, Hadoop ecosystem, Semantic IoT등의 프로젝트를 진행중에 습득한 내용을 정리하는 곳입니다.
필요한 분을 위해서 공개하고 있습니다. 문의사항은 gooper@gooper.com로 메일을 보내주세요.


출처 : http://letsexplorehadoop.blogspot.com/2016/05/upsert-in-hive-3-step-process.html



아래설명을 기준으로 hive에서 실행해본 hive script

--------------------------------------------------------------------------------------------------

-- create table if not exists site_view_hist(

-- brower_name string,

-- clicks_count int,

-- impressions_count int)

-- partitioned by (hit_date date)

-- row format delimited

-- fields terminated by ',';


-- gooper@gsda1:/var/log$ hdfs dfs -cat /user/hive/warehouse/site_view_hist/hit_date=2016-01-01/000000_0

--iexplorer,123,456


SET hive.support.concurrency = true;

SET hive.enforce.bucketing = true;

SET hive.exec.dynamic.partition.mode = nonstrict;

SET hive.txn.manager =org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;

SET hive.compactor.initiator.on = true;

SET hive.compactor.worker.threads = 1;


truncate table site_view_hist;

truncate table site_view_raw;



insert into table site_view_hist partition(hit_date='2016-01-01') values('iexplorer', 123, 456);

insert into table site_view_hist partition(hit_date='2016-01-01') values('firefox', 123, 456);

insert into table site_view_hist partition(hit_date='2016-01-01') values('chrome', 123, 456);

insert into table site_view_hist partition(hit_date='2016-01-02') values('firefox', 111, 431);

insert into table site_view_hist partition(hit_date='2016-01-03') values('chrome', 234, 567);

insert into table site_view_hist partition(hit_date='2016-01-03') values('iexplorer', 234, 567);

insert into table site_view_hist partition(hit_date='2016-01-03') values('firefox', 987, 654);

insert into table site_view_hist partition(hit_date='2016-01-04') values('chrome', 529, 912);

insert into table site_view_hist partition(hit_date='2016-01-05') values('firefox', 911, 888);

insert into table site_view_hist partition(hit_date='2016-01-06') values('iexplorer', 900, 833);



select * from site_view_hist;


-- create table if not exists site_view_raw(

-- brower_name string,

-- clicks_count int,

-- impressions_count int)

-- partitioned by (hit_date date)

-- row format delimited

-- fields terminated by ',';


insert into table site_view_raw partition(hit_date='2016-01-01') values('chrome', 246, 789);

insert into table site_view_raw partition(hit_date='2016-01-01') values('firefox', 999, 200);

insert into table site_view_raw partition(hit_date='2016-01-31') values('iexplorer', 144, 999);


select * from site_view_raw;



select h.* from site_view_hist h where h.hit_date in (select distinct hit_date from site_view_raw r);


drop table site_view_temp1;


--아래 설명에서 subquery부분에 brower_name is not null을 추가하여 파티션만 있고 데이타 없는 경우는 포함되지 않도록함

create table site_view_temp1

as select h.* from site_view_hist h where h.hit_date in (select distinct hit_date from site_view_raw r where r.brower_name is not null);


select * from site_view_temp1;


create table site_view_temp2 as select t1.* from site_view_temp1 t1

where not exists

(select 1 from site_view_raw r

where t1.brower_name=r.brower_name

and t1.hit_date=r.hit_date);


select * from site_view_temp2;



insert into table site_view_temp2

select * 

from site_view_raw;


select * from site_view_temp2;


insert overwrite table site_view_hist

partition(hit_date)

select * from site_view_temp2;



select * from site_view_hist;

--------------------------------------------------------------------------------------------------

UPSERT in Hive(3 Step Process)

In this post I am providing a 3 step process for performing UPSERT in hive on a large size table containing entire history.
Just for the audience not aware of UPSERT - It is a combination of UPDATE and INSERT. If on a table containing history data, we receive new data which needs to be inserted as well as some data which is an UPDATE to the existing data, then we have to perform an UPSERT operation to achieve this.

Prerequisite – The table containing history being very large in size should be partitioned, which is also a best practice for efficient data storage, when working with large data in hive.

Business scenario – Lets take a scenario of a website table containing website metrics as gathered from different browsers of visitors who visited the website. The site_view_hist table contains the clicks and page impressions counts from different browsers and the table is partitioned on hit_date(the date on which the visitor visited the website).
Clicks – number of clicks(Eg on adds displayed) done by visitor on website page.
Impressions – number of times the website pages or different sections were viewed by the visitor.

Problem statement - If we receive correction in the number of clicks and impressions as recorded by browser, we need to update them in the history table and also insert any new records we received.
Lets dive into it:
In the history table we have browser_name and hit_date as a composite key which will remain constant and we receive updates in the values of clicks_count and impressions_count columns.
DDL of history table

Data:

Now suppose we receive records for date 2016-01-01(marked in blue) for firefox and chrome browsers, with an updated value of clicks and impressions, and we also received a new record(iexplorer) for 2016-01-31. Let us store these new and updated records in the following raw table:
DDL of Raw table




Data

Now we need an UPSERT solution, which updates the records of site_view_hist table for hit_date 2016-01-01 and insert the new record for 2016-01-31.
                                               SOLUTION (3 STEP):
To achieve this in an efficient way, we will use the following 3 step process:
Prep Step - We should first get those partitions from the history table which needs to be updated. So we create a temp table site_view_temp1 which contains the rows from history with hit_date equal to the hit_date of raw table.
This will bring us all the hit_date partitions from history table for which atleast one updated record exists in the raw table.
Note - Instead of table we can also create a view for efficient processing and saving storage space.


Data of site_view_temp1 table:

Step 1 – From these fetched partitions we will separate the old unchanged rows. These are the rows in which there is no change in the clicks and impressions count. For this we will create a temp table site_view_temp2 as follows:








Data of site_view_temp2 table:

Step2 – Now we will insert into this new temp table, all the rows from the raw table. This step will bring in the updated rows as well as any new rows. And since site_view_temp2 already contained the old rows, so it will now have all the rows including new, updated, and unchanged old rows. Following query does this: 



New Data of site_view_temp2 table

Step3 – Now simply insert overwriting the site_view_hist table with site_view_temp2 table, will provide us the required output rows including two updated rows for 2016-01-01 and one new inserted row for 2016-01-31.
Catch – Since the history table is partitioned on the hit_date, the respective partitions will only be overwritten as follows:




Final history table  with updated and inserted rows:

Benefits of this approach:         
  1. In the prep step itself since we are fetching just the partitions we have to update, so we are not scanning the whole history table. This makes our processing faster.
  2. In the final step as we are insert overwriting the history with the temp table, we are touching just the partition we want to update along with a new partition created for the new record.This gives a high performance gain, as I gained for my production process on a 6.7 TB history table with over 5 billion records. But since my 3 step process(included in one hive script) just touched few partitions of few thousand rows, the process completed in just minutes.
번호 제목 글쓴이 날짜 조회 수
89 hive metastore ERD file 총관리자 2018.09.20 723
88 oracle to hive data type정리표 총관리자 2018.08.22 763
87 RHEL 7.4에 zeppelin 0.7.4 설치 총관리자 2018.07.31 196
86 conda를 이용한 jupyterhub(v0.9)및 jupyter설치 (v4.4.0) 총관리자 2018.07.30 416
» upsert구현방법(년-월-일 파티션을 기준으로) 및 테스트 script file 총관리자 2018.07.03 1219
84 Toree 0.1.0-incubating이 Scala 2.10.4까지만 지원하게 되어서 발생하는 NoSuchMethod오류 문제 해결방법(scala 2.11.x을 지원하지만 오류가 발생할 수 있음) 총관리자 2018.04.20 110
83 우분투 16.04LTS에 Zeppelin 0.7.3설치 총관리자 2018.04.18 198
82 CentOS 7.x에 Jupyter설치 총관리자 2018.04.18 550
81 Apache Toree설치(Jupyter에서 Scala, PySpark, SparkR, SQL을 사용할 수 있도록 하는 Kernel) 총관리자 2018.04.17 146
80 우분투 16.04LTS에 Jupyter설치 총관리자 2018.04.17 90
79 beeline으로 접근시 "User: gooper is not allowed to impersonate anonymous (state=08S01,code=0)"가 발생하면서 "No current connection"이 발생하는 경우 조치 총관리자 2018.04.15 193
78 Scala에서 countByWindow를 이용하기(예제) 총관리자 2018.03.08 235
77 Scala를 이용한 Streaming예제 총관리자 2018.03.08 69
76 scala application 샘플소스(SparkSession이용) 총관리자 2018.03.07 135
75 spark-submit 실행시 "java.lang.OutOfMemoryError: Java heap space"발생시 조치사항 총관리자 2018.02.01 515
74 Could not compute split, block input-0-1517397051800 not found형태의 오류가 발생시 조치방법 총관리자 2018.02.01 184
73 spark stream처리할때 두개의 client프로그램이 동일한 checkpoint로 접근할때 발생하는 오류 내용 총관리자 2018.01.16 1113
72 Windows7 64bit 환경에서 Apache Spark 2.2.0 설치하기 총관리자 2017.07.26 259
71 Hive MetaStore Server기동시 Could not create "increment"/"table" value-generation container SEQUENCE_TABLE since autoCreate flags do not allow it. 오류발생시 조치사항 총관리자 2017.05.03 340
70 Ubuntu 16.04 LTS에 Hive 2.1.1설치하면서 "Version information not found in metastore"발생하는 오류원인및 조치사항 총관리자 2017.05.03 465

A personal place to organize information learned during the development of such Hadoop, Hive, Hbase, Semantic IoT, etc.
We are open to the required minutes. Please send inquiries to gooper@gooper.com.

위로