摘要:概要通过函数进行数据写入,了解这个重要函数有助于理解工作流程优化写入性能。举个例子,的属性为的客户端向重复创建,属性的值为,每次值为当前时间戳而不同,造成元数据重复更新。
概要
Atlas通过AtlasEntityStoreV2.createOrUpdate函数进行数据写入,了解AtlasEntityStoreV2.createOrUpdate这个重要函数有助于理解Atlas工作流程、优化写入性能。本文主要梳理createOrUpdate中各子模块的功能和逻辑。
源码解析
函数格式:EntityMutationResponse createOrUpdate(EntityStream entityStream, boolean isPartialUpdate, boolean replaceClassifications)
数据写入过程中的上下文信息存于EntityGraphDiscoveryContext context对象中,数据格式如下:
public final class EntityGraphDiscoveryContext { private final AtlasTypeRegistry typeRegistry; private final EntityStream entityStream; private final ListreferencedGuids = new ArrayList<>(); private final Set referencedByUniqAttribs = new HashSet<>(); private final Map resolvedGuids = new HashMap<>(); private final Map resolvedIdsByUniqAttribs = new HashMap<>(); private final Set localGuids = new HashSet<>();
step1:通过preCreateOrUpdate函数对entityStream预处理:找出entityStream中所有entity的AtlasObjectId,判断entity需要进行创建还是更新操作,创建新vertex
a) AtlasEntityGraphDiscoveryV2.discover,遍历所有entity的AtlasObjectId,放入到context中:
情况1)AtlasObject.guid!=null时将guid放入context.referencedGuids中; 情况2)若AtlasObject.id为null,则表示该entity已经写入图数据库,将AtlasObject放入context.referencedByUniqAttribs
b) AtlasEntityGraphDiscoveryV2.resolveReferences,判断entity是否在图数据库中存在:
情况1)若context.referencedGuids中的guid在图数据库中存在对应vertex,将guid和vertex放入context.resolvedGuids中; 情况2)若context.referencedGuids中的guid在图数据库中不存在对应vertex,将guid放入context.localGuidReference中; 情况3)根据context.referencedByUniqAttribs中的AtlasObjectId找到对应顶点,将顶点放入resolvedIdsByUniqAttribs中,并将AtlasObjectId放入
c) 对不存在对应vertex的entity创建vertex,并放入resolvedGuids
d) 将需要创建的entity和需要更新的entity分别放入EntityMutationContext.entitiesCreated和EntityMutationContext.entitiesUpdated中
step2:对比entityStream中的entity和图数据库中的vertex,判断是否有属性发生变化,忽略不需要更新的entity,核心代码:
for(AtlasAttribute attribute:entityType.getAllAttributes().values()){ if(!entity.getAttributes().containsKey(attribute.getName())){ // if value is not provided, current value will not be updated continue; } Object newVal=entity.getAttribute(attribute.getName()); Object currVal=entityRetriever.getEntityAttribute(vertex,attribute); if(!attribute.getAttributeType().areEqualValues(currVal,newVal,context.getGuidAssignments())){ hasUpdates=true; if(LOG.isDebugEnabled()){ LOG.debug("found attribute update: entity(guid={}, typeName={}), attrName={}, currValue={}, newValue={}",guid,entity.getTypeName(),attribute.getName(),currVal,newVal); } break; } }
注意:当attribute为referred AtlasObjectId时,客户端定义的guid是UnAssigned的,必然和数据库中存储的guid值不同,这会造成元数据的重复更新,这个目前Atlas有待改进的地方。举个例子,Hive_Table的db属性为Hive_Db的AtlasObjectId, 客户端向Atlas Server重复创建Hive_Table,db属性的值为AtlasObjectId{typeName=hive_db, guid=-1554620941},每次guid值为当前时间戳而不同,造成table元数据重复更新。
step3:操作权限验证:通过AtlasAuthorizationUtils.verifyAccess函数验证发起请求的用户是否有权限对各个entity进行写操作
step4:EntityGraphMapper.mapAttributesAndClassifications为vertex更新attributes,关于entity和vertex属性的映射,可以参考文章元数据治理框架Atlas研究——JanusGraph图数据库对象关系映射
step5:通过entityChangeNotifier.onEntitiesMutated为vertex创建全文索引,并通知audit模块记录所有的变更操作
注:在整个数据写入过程中,创建全文索引这一步骤会占用超过60%的时间,如果在实际使用中不需要用全文索引的功能,可以修改源码注释掉相应doFullTextMapping函数
step6:整个数据写入过程中,我们并未提到Atlas如何调用JanusGraph的api来向图数据库写入数据。其实,Atlas只需要通过JanusGraph api中的vertex、edge对象维护数据的图结构即可。Atlas对数据读写函数都添加了@GraphTransaction注解,这个注解确保在函数运行结束后调用graph.commit()函数将当前事务内的变更提交图数据库。具体的实现可以见GraphTransactionInterceptor类。
文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。
转载请注明本文地址:https://www.ucloud.cn/yun/74085.html
摘要:把元数据信息分成和两类,其中表示对象自身的属性如的等,表示对象和其他元数据对象的关系如所属的。这样设计确保在查询元数据时选择是否忽略。具体实现以函数为例分析如何根据唯一属性查找元数据对象。 概要Atlas采用了分布式图数据库JanusGraph作为数据存储,目的在于用有向图灵活的存储、查询数据血缘关系。Atlas定义了一套atlas-graphdb-api,允许采用不同的图数据库引擎来...
阅读 1147·2021-11-10 11:35
阅读 2888·2021-09-24 10:35
阅读 2893·2021-09-22 15:38
阅读 2784·2019-08-30 15:43
阅读 1306·2019-08-29 18:39
阅读 2525·2019-08-29 15:22
阅读 2767·2019-08-28 18:17
阅读 589·2019-08-26 13:37