《武汉工程大学学报》  2017年05期 508-513   出版日期:2017-12-19   ISSN:1674-2869   CN:42-1779/TQ
消除规范关系连接冗余的二次排序算法研究


MapReduce在对规范的一对多关系进行连接操作时,一方关系的各个属性值会在连接的结果中产生大量的冗余,为消除冗余,可利用HBase表的稀疏存储特性,将一方关系的各个属性值只存储一次,同时将其对应的多方关系进行按列多次存储. 实现的过程可借鉴二次排序算法的思想,让一方和多方关系在Map端进行连接后,输出的Key既包含一方关系的属性,又包含多方关系中可排序的值,从而使得Reduce端在规约时可将Key包含的一方关系的属性值及多方关系的经过二次排序后的属性值直接写入HBase表. MapReduce是Google?在2004年提出一个用于处理大数据的分布式计算框架,其数据处理的流程分为Map、Shuffle及Reduce三个阶段. 在Map阶段,原始数据源根据其数据特征被划分成若干数据块,每个数据块由集群中的节点进行Map逻辑处理,结果以Key/Value(键/值对)的形式输出. Shuffle阶段负责对Key/Value对进行排序及分组,Map阶段的排序发生在将节点内存缓冲区的key/Value写入到本地磁盘spill文件,及将多个本地磁盘spill文件合并为一个spill文件时,排序的过程为:首先根据key所属的Partition (分区)排序,每个Partition再按Key进行排序. Map阶段完成后,每个的Partition会被拷贝到对应的Reduce节点,由于Reduce节点会接受来自多个Map节点的数据,故Shuffle在Reduce阶段的任务就是将来自不同Map节点的Partition按Key值进行归并排序后,将Key/Value根据Key值分组为[Key,List(Value1,Value2…Valuen)]作为Reduce任务的输入. Reduce阶段负责对[Key,List< Value1,Value2…Valuen]按特定逻辑进行规约处理,并将结果输出[1-2]. Hadoop MapReduce 是Google MapReduce框架的开源实现[3],通过对Hadoop MapReduce进行扩展,可以将HBase与MapReduce进行集成,从而使得HBase数据表和外界数据源可以以MapReduce?的方式进行双向交互,从而提高数据的处理速度和效率. HBase 是建立在Hadoop之上,具有高可靠性、高性能、列存储、可伸缩、实时读写特点的数据库系统,能够为海量的数据提供高性能的数据维护及查询服务[4-8]. 可以利用MapReduce将具有相同属性的文件进行连接操作,根据参与连接的文件大小可选择使用Reduce端连接、Map端连接、Semi(半)连接及Reduce端+Bloom Filter连接,连接的结果可以写入文本文件,也可以直接写入HBase数据表. 由于MapReduce的Shuffle过程默认按连接结果的Key进行排序,若需要对Value也进行排序,则需要重新定义Shuffle的排序和分组过程,进行二次排序,从而使得连接的结果首先按 Key排序,然后再按Value排序[9-15]. 1 改进的二次排序算法 假设规范的一方关系为M(MKEY,MATT1,MATT2,…,MATTn),多方关系为S(SKEY,MKEY ,SATT,SVALUE),其中MKEY为关系S的外码,(mkey[m],matt1[m],matt2[m],…, mattn[m]),m∈[1,n]表示关系M的一个元组,关系S中的SATT属性的取值范围为{satt1,satt2,…,sattn},SValue的取值范围为{svalue1,svalue2,…,svaluen},且按序号从小到大排序的整型值. 则使用MKEY对关系M和S进行连接操作,并根据SVALUE值进行二次排序后的结果如下所示: (mKey[1] matt1[1] matt2[1])… mattn[1]) satt1 svalue1 (mKey[1] matt1[1] matt2[1])… mattn[1]) satt2 svalue2 (mKey[1] matt1[1] matt2[1])… mattn[1]) satt3 svalue3 (mKey[1] matt1[1] matt2[1])… mattn[1]) satt4 svalue4 (mKey[2] matt1[2] matt2[2])… mattn[2]) satt4 svalue1 (mKey[2] matt1[2] matt2[2])… mattn[2]) satt3 svalue3 (mKey[2] matt1[2] matt2[2])… mattn[2]) satt1 svalue4 (mKey[3] matt1[3] matt2[3])… mattn[3]) satt4 svalue3 (mKey[3] matt1[3] matt2[3])… mattn[3]) satt1 svalue3 … 可见一方关系M的每个元组的各个属性会在连接结果中产生了大量冗余,消除冗余的方法是将连接的结果进行转换,写入HBase的数据表. 由于HBase表是按列存储的,在定义表结构时只需要定义列族(Column Family),对属于列族的列的数量没有限制,以ColumnFamily:Qualifier的形式表示一个列名,Qualifier可以是任意的字节数组. 因此可以以S:satt[k]列,k∈[1,n],S为多方关系名,satt[k]为S中的SATT属性的在连接结果中的值,来存储连接结果中的多方关系SVALUE属性的值. 对于连接结果中的一方关系,提取mkey[m],m∈[1,n]为HBase表的行健,以M:MATT[m]列,m∈[1,n],M为一方关系名,MATT[m]为M中的MATT属性名,来存储一方关系在连接结果中和mkey[m]对应的(matt1[m],matt2[m],…, mattn[m])属性值,从而使得一方关系的连接结果只存储了一次,既实现了连接的语义,又消除了冗余. HBase存储列值时默认按列名进行排序,故经过二次排序后的连接结果的svalue[t],t∈[1,n]可能不会按照排序后的次序进行存储,可增加M:Seq列存储排序后的svalue[t]值及其与satt[k],k∈[1,n]的对应关系. 经过二次排序后的连接结果在HBase表中的存储结构如图1所示. 实现将一对多的规范关系进行连接,二次排序后,直接写入HBase表,其过程如下. 1.1 自定义组合键 MapReduce 的Shuffle阶段只能按Key对数据进行排序,因此若需要在对Key进行排序后,再对Value进行排序,必须使得Map阶段输出的Key包含多方关系S中的SVALUE值,为将一方关系M的各个属性写入HBase表,Key还要包含一方关系M的各个属性值,自定义的组合键如下所示: class CombinationKey implements WritableComparable { Text firstKey; IntWritable secondKey; //读取及存储部分代码省略... } 其中firstKey存储M关系中的MKEY及各个MATT属性值以字符“\t”分隔的字符串,mkey为以“\t”分隔的第一个子字符串,secoondKey存储S关系中的SVALUE值. 1.2 实现Map端连接 由于一方关系M的数据一般较小,故可将其数据文件复制多份,让每个map 节点内存中保存一份(如存放到HashMap中),然后扫描多方关系S:对于S中的每一条记录,在HashMap中查找是否有相同的MKEY的记录,如果有,则连接后输出. 在MapReduce的任务启动时,通过job.addCacheFile(“hdfs://namenode:9000/M.txt”)指定要复制的一方数据文件M.txt,JobTracker在作业启动之前会获取这个URI列表,并将相应的文件拷贝到各个TaskTracker的本地磁盘上,Map函数在重载的setup方法中通过context.getCacheFiles()可以获取到缓存到本地的文件. 实现Map端连接的过程如下: //定义Map的输出为< CombinationKey, Text> HBaseMapper extends Mapper {private HashMapcache_M = new HashMap(); protected void setup(Context context) { BufferedReader br = null; URI[] distributePaths = context.getCacheFiles(); String mInfo = null;File mFile = new File("./M.txt"); br = new BufferedReader(new FileReader(mFile.getPath())); //读缓存文件,并放到内存中 while(null!=(mInfo =br.readLine())){ String[] mParts = mInfo.split("\t"); cache_M.put(mParts [0], mInfo)} //mPart[0]为MKEY值 } protected void map(LongWritable key, Text value, Context context) { string mkey = 得到S关系每一行数据的MKEY值; string sattr =得到S关系每一行数据的SATT值; IntWritable svalue =得到S关系每一行数据的SVALUE值; Text mInfo = new Text(cache_M.get(mkey)); if(mInfo != null){ CombinationKey cbkey = new CombinationKey(); cbkey.setFirstKey(mInfo);cbkey.setSecondKey(svalue); context.write(cbkey, new Text(sattr + “\t” + svalue)); }}} 1.3 重新定义分区函数和排序依据及分组函数 由于在Map端就进行一方关系M和多方关系S的连接操作,故需要重新定义分区函数,使得firstKey中具有相同mkey的连接结果分到同一个区(Partition),自定义分区类的定义如下: class CusPartition extends Partitioner{ public int getPartition(CombinationKey key, Text value,int numPartitions) { string mkey = 取出key中firstKey部分的mkey; //mkey为分组依据 return (mkey.hashCode()&Integer.MAX_VALUE)%numPartitions; } }// numPartitions的值为集群中reduce节点的数量. 在Map和Reduce阶段都需要对处在同一个分区的连接结果首先按firstKey中的mkey进行排序,再按secondKey进行排序,自定义的排序比较类定义如下: class CusComparator extends WritableComparator { public int compare(WritableComparable cbKeyOne, WritableComparable cbKeyTwo) { CombinationKey c1 = (CombinationKey) cbKeyOne; CombinationKey c2 = (CombinationKey) cbKeyTwo; string mkey1 =从c1中取出firstKey部分的mkey;IntWritabe svalue1=取出c1中的secondKey; string mkey2=从c2中取出firstKey部分的mkey;IntWritabe svalue2=取出c2中的secondKey; if(!mkey1.equals(mkey2)){return mkey1.compareTo(mkey2); } //以字符方式比较mkey else{ return svalue1-svalue2; //以数值方式比较secondKey} }} 比较方法返回值值分别以小于零的值、零值、大于零的值表示小于、等于和大于. 在Reduce阶段将具有相同combinationkey的连接结果分在同一组,形成[combinationkey, List(sattr[k] “\t”svalue1, sattr[k] “\t”svalue2...sattr[k] “\t”svaluen)],k∈[1,n]. 分组的依据仍然为firstkey的mkey部分,自定义的分组类如下所示: class CusGrouping extends WritableComparator{ public int compare(WritableComparable cbKeyOne, WritableComparable cbKeyTwo) { CombinationKey c1 = (CombinationKey) cbKeyOne; CombinationKey c2 = (CombinationKey) cbKeyTwo; string mkey1 =从c1中取出firstKey部分的mkey; string mkey2=从c2中取出firstKey部分的mkey; return mkey1.compareTo(mkey2); }} 1.4 在Reduce阶段将连接结果写入HBase表 Reduce首先对输入的combinationkey进行分解,取出firstKey中的mkey作为HBase 表的行健,然后将firstKey中的其它属性值依次以M:MATT[m],m∈[1,n]列存储. 对已经按svalue排序好的集合List(sattr[k] “\t”svalue1, sattr[k] “\t”svalue2...sattr[k] “\t”svaluen),以S:sattr[k],k∈[1,n]列存储对应的svalue值. 由于HBase默认按列的名称S:satt[k]进行排序,故存储的次序可能与排序的结果不一致,可以增加 M:seq列,存储排序后的svalue值,Reduce的过程定义如下: class SCHBTReducer extends TableReducer{ public void reduce(CombinationKey key,Iterable values,Context context){ string mSeq = “”; string[] mParts = key.getFirstKey().toString().split("\t"); Put put = new Put(mParts[0].getBytes());//行健为mkey for(int i=1;i