资讯专栏INFORMATION COLUMN

元数据治理框架Atlas研究——数据写入过程源码分析

wanglu1209 / 767人阅读

摘要:概要通过函数进行数据写入,了解这个重要函数有助于理解工作流程优化写入性能。举个例子,的属性为的客户端向重复创建,属性的值为,每次值为当前时间戳而不同,造成元数据重复更新。

概要
Atlas通过AtlasEntityStoreV2.createOrUpdate函数进行数据写入,了解AtlasEntityStoreV2.createOrUpdate这个重要函数有助于理解Atlas工作流程、优化写入性能。本文主要梳理createOrUpdate中各子模块的功能和逻辑。

源码解析
函数格式:EntityMutationResponse createOrUpdate(EntityStream entityStream, boolean isPartialUpdate, boolean replaceClassifications)
数据写入过程中的上下文信息存于EntityGraphDiscoveryContext context对象中,数据格式如下:

public final class EntityGraphDiscoveryContext {
    private final AtlasTypeRegistry               typeRegistry;
    private final EntityStream                    entityStream;
    private final List                    referencedGuids          = new ArrayList<>();
    private final Set              referencedByUniqAttribs  = new HashSet<>();
    private final Map        resolvedGuids            = new HashMap<>();
    private final Map resolvedIdsByUniqAttribs = new HashMap<>();
    private final Set                     localGuids               = new HashSet<>();

step1:通过preCreateOrUpdate函数对entityStream预处理:找出entityStream中所有entity的AtlasObjectId,判断entity需要进行创建还是更新操作,创建新vertex

a) AtlasEntityGraphDiscoveryV2.discover,遍历所有entity的AtlasObjectId,放入到context中:

  情况1)AtlasObject.guid!=null时将guid放入context.referencedGuids中;
  情况2)若AtlasObject.id为null,则表示该entity已经写入图数据库,将AtlasObject放入context.referencedByUniqAttribs    

b) AtlasEntityGraphDiscoveryV2.resolveReferences,判断entity是否在图数据库中存在:

  情况1)若context.referencedGuids中的guid在图数据库中存在对应vertex,将guid和vertex放入context.resolvedGuids中;
  情况2)若context.referencedGuids中的guid在图数据库中不存在对应vertex,将guid放入context.localGuidReference中;
  情况3)根据context.referencedByUniqAttribs中的AtlasObjectId找到对应顶点,将顶点放入resolvedIdsByUniqAttribs中,并将AtlasObjectId放入

c) 对不存在对应vertex的entity创建vertex,并放入resolvedGuids
d) 将需要创建的entity和需要更新的entity分别放入EntityMutationContext.entitiesCreated和EntityMutationContext.entitiesUpdated中

step2:对比entityStream中的entity和图数据库中的vertex,判断是否有属性发生变化,忽略不需要更新的entity,核心代码:

for(AtlasAttribute attribute:entityType.getAllAttributes().values()){
        if(!entity.getAttributes().containsKey(attribute.getName())){  // if value is not provided, current value will not be updated
        continue;
        }

        Object newVal=entity.getAttribute(attribute.getName());
        Object currVal=entityRetriever.getEntityAttribute(vertex,attribute);

        if(!attribute.getAttributeType().areEqualValues(currVal,newVal,context.getGuidAssignments())){
        hasUpdates=true;

        if(LOG.isDebugEnabled()){
        LOG.debug("found attribute update: entity(guid={}, typeName={}), attrName={}, currValue={}, newValue={}",guid,entity.getTypeName(),attribute.getName(),currVal,newVal);
        }

        break;
        }
}

注意:当attribute为referred AtlasObjectId时,客户端定义的guid是UnAssigned的,必然和数据库中存储的guid值不同,这会造成元数据的重复更新,这个目前Atlas有待改进的地方。举个例子,Hive_Table的db属性为Hive_Db的AtlasObjectId, 客户端向Atlas Server重复创建Hive_Table,db属性的值为AtlasObjectId{typeName=hive_db, guid=-1554620941},每次guid值为当前时间戳而不同,造成table元数据重复更新。

step3:操作权限验证:通过AtlasAuthorizationUtils.verifyAccess函数验证发起请求的用户是否有权限对各个entity进行写操作

step4:EntityGraphMapper.mapAttributesAndClassifications为vertex更新attributes,关于entity和vertex属性的映射,可以参考文章元数据治理框架Atlas研究——JanusGraph图数据库对象关系映射

step5:通过entityChangeNotifier.onEntitiesMutated为vertex创建全文索引,并通知audit模块记录所有的变更操作
注:在整个数据写入过程中,创建全文索引这一步骤会占用超过60%的时间,如果在实际使用中不需要用全文索引的功能,可以修改源码注释掉相应doFullTextMapping函数

step6:整个数据写入过程中,我们并未提到Atlas如何调用JanusGraph的api来向图数据库写入数据。其实,Atlas只需要通过JanusGraph api中的vertex、edge对象维护数据的图结构即可。Atlas对数据读写函数都添加了@GraphTransaction注解,这个注解确保在函数运行结束后调用graph.commit()函数将当前事务内的变更提交图数据库。具体的实现可以见GraphTransactionInterceptor类。

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/74085.html

相关文章

  • 数据治理框架Atlas研究——JanusGraph图数据库对象关系映射

    摘要:把元数据信息分成和两类,其中表示对象自身的属性如的等,表示对象和其他元数据对象的关系如所属的。这样设计确保在查询元数据时选择是否忽略。具体实现以函数为例分析如何根据唯一属性查找元数据对象。 概要Atlas采用了分布式图数据库JanusGraph作为数据存储,目的在于用有向图灵活的存储、查询数据血缘关系。Atlas定义了一套atlas-graphdb-api,允许采用不同的图数据库引擎来...

    CoffeX 评论0 收藏0

发表评论

0条评论

最新活动
阅读需要支付1元查看
<