Pythian Blog: Technical Track

Recursion in Hive - part 1

  I am going to start this new series of blog posts talking about code migration use cases. We will talk about migration from RDBMS to Hive keeping the simplicity and flexibility of a SQL approach. The first case is about recursive SQL. In most of the situations for RDBMS it covered by recursive queries by using a "with" clause. Though, unfortunately it's not yet supported in Hive;. Let's consider the following scenario. We have PRODUCTs and STATEs. STATEs make the forest of trees structure. The facts are combinations of PRODUCTs and STATEs which may have some data. Here are the simplified DDLs: [sourcecode language="SQL"] create table t_product ( product string); create table t_state ( state string, next_state string); create table t_big_data ( product string, state string, data string); [/sourcecode] The task is: for an input set of pairs (PRODUCT, STATE) try to find the next available STATE with data in fact table or return NULL. The input data is stored in t_input table: [sourcecode language="SQL"] create table t_input ( product string, state string); [/sourcecode] We need to populate t_output table: [sourcecode language="SQL"] create table t_output ( product string, state string, found_state string, data string); [/sourcecode] Here are various methods to solve this: procedural approach with recursive functions, recursive SQL, multi-joins (in case we know the max depth). The most reasonable for the modern RDBMS supporting recursive queries would be something like this: [sourcecode language="SQL"] insert into t_output(product, state, found_state, data) with rec (product, state1, state2, data) as ( select i.product, i.state, i.state, d.data from t_input i left join t_big_data d on d.product = i.product and d.state = i.state union all select r.product, r.state1, s.next_state, b.data from rec r join t_state s on s.state = r.state2 left join t_big_data d on d.product = r.product and d.state = s.next_state where r.data is null ) select product, state1 as state, state2 as found_state, data from rec where data is not null or state2 is null; [/sourcecode] RDBMS can make a good execution plan for such queries especially if there are correct indexes on t_big_data table. We could do a multi-join approach in Hive but the cost for each big table scan it too high. The trick we will use here is based on an observation that a tree structure is usually relatively small in comparison with a data table. So that we can easily "expand" a tree into flat denormalized structure: for each STATE from the initial table, we keep all STATES and path length on the way to root. For example, for the following simple tree: [sourcecode language="SQL"] STATE NEXT_STATE ----- ---------- S1 NULL S2 S1 S3 S1 S4 S2 [/sourcecode] We would have: [sourcecode language="SQL"] STATE1 STATE2 LEVEL ------ ------ ----- S1 S1 0 S2 S2 0 S2 S1 1 S3 S3 0 S3 S1 1 S4 S4 0 S4 S2 1 S4 S1 2 [/sourcecode] Using the RDBMS recursive queries we would create this table as: [sourcecode language="SQL"] create table t_expand_state ( state1 string, state2 string, lvl integer); insert into t_expand_state (state1, state2, lvl) with rec (state1, state2, lvl) as ( select state, state, 0 from t_state union all select r.state1, s.next_state, r.lvl + 1 from rec r join t_state s on r.state2 = s.state where s.next_state is not null ) select * from rec; [/sourcecode] For Oracle DB we could do this with "connect by": [sourcecode language="SQL"] select connect_by_root state state1, state as state2, level-1 lvl from t_state connect by prior next_state = state; [/sourcecode] Having this t_expand_state table we can rewrite out query as: [sourcecode language="SQL"] insert into t_output(product, state, found_state, data) select t.product, t.state, case when t.min_lvl_with_data is not null then t.state2 end, t.data from ( select i.product, i.state, s.state2, s.lvl, d.data, min(case when d.data is not null then lvl end) over(partition by i.product_id, i.state) min_lvl_with_data from t_input i join t_expand_state s on s.state1 = i.state left join t_big_data d on d.product = i.product and d.state = s.state2) t where t.lvl = t.min_lvl_with_data or (t.lvl = 0 and t.min_lvl_with_data is null); [/sourcecode] This solution has its specific edge cases of inefficiency: -- big t_state that produce abnormal t_expand_state table; -- dense t_big_data table: so that during the query execution it has to keep a lot of extra-rows with "data" for states we don't need; -- big t_input: joining it by all "next states" would inflate dataset. But for practical use t_input is usually relatively small and there isn't much overhead for getting extra-data for the next states. Another advantage is that we scan t_big_data only once. To reach our goal the only task left is: how to build t_expand_state in Hive without recursion? Well we surely may consider multi-joins once again, but my choice is to use: UDTF. In order to make recursion more natural I implemented this function using Scala. In the ExpandTreeUDTF we store tree structure in a mutable map during the "process" method call. After that it expands this map using memoization. [sourcecode language="Scala" collapse="true"] class ExpandTree2UDTF extends GenericUDTF { var inputOIs: Array[PrimitiveObjectInspector] = null val tree: collection.mutable.Map[String,Option[String]] = collection.mutable.Map() override def initialize(args: Array[ObjectInspector]): StructObjectInspector = { inputOIs = args.map{_.asInstanceOf[PrimitiveObjectInspector]} val fieldNames = java.util.Arrays.asList("id", "ancestor", "level") val fieldOI = primitive.PrimitiveObjectInspectorFactory.javaStringObjectInspector.asInstanceOf[ObjectInspector] val fieldOIs = java.util.Arrays.asList(fieldOI, fieldOI, fieldOI) ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs); } def process(record: Array[Object]) { val id = inputOIs(0).getPrimitiveJavaObject(record(0)).asInstanceOf[String] val parent = Option(inputOIs(1).getPrimitiveJavaObject(record(1)).asInstanceOf[String]) tree += ( id -> parent ) } def close { val expandTree = collection.mutable.Map[String,List[String]]() def calculateAncestors(id: String): List[String] = tree(id) match { case Some(parent) => id :: getAncestors(parent) ; case None => List(id) } def getAncestors(id: String) = expandTree.getOrElseUpdate(id, calculateAncestors(id)) tree.keys.foreach{ id => getAncestors(id).zipWithIndex.foreach{ case(ancestor,level) => forward(Array(id, ancestor, level)) } } } } [/sourcecode] Having this we may compile it to jar, add it to Hive, create function and use it to build t_expand_state table. [sourcecode language="SQL"] create function expand_tree as 'com.pythian.nikotin.scala.ExpandTreeUDTF'; insert ovewrite table t_expand_state (state1, state2, lvl) select expand_tree(state, next_state) from t_state; [/sourcecode]    

No Comments Yet

Let us know what you think

Subscribe by email