资讯专栏INFORMATION COLUMN

GuavaCache

smartlion / 985人阅读

摘要:执行将异常后的删除。根据元素个数上限进行清理的策略思路在新缓存值的时候比对下是否缓存容量元素个数已经达到上限,如果达到上限按照算法进行淘汰元素。在读写完成后会进行通知源码会回调进行缓存元素删除后置处理。

Google Guava LocalLoadingCache 前言

在我们编程的过程中会遇到一些在程序中需要重试使用的数据,在这种情况下我们就可以考虑利用缓存(内存)的优势来提供程序访问这些数据的一个性能了。利用了缓存可以在一定程度上缓解很大的性能消耗:

网络传输开销

数据序列化反序列话

数据库、文件系统数据访问慢

缓存器是利用内存进行数据存储的,在存储容量上有一定的限制,所以我们在我们使用缓存的时候也分两种场景:

全量数据缓存

缓存热数据,这也是基于缓存容量的一个考虑

好了本篇我们就来聊聊写程序过程中常能用到的本地缓存的方式。

JDK提供的数据结构(Map)

缓存数据的存储格式一般都是以Key-Value的方式,那这里我们主要来讨论下Map的实现ConcurrentHashMap实现的缓存。

String key = StringUtils.EMPTY;
ConcurrentMap localCache  = new ConcurrentHashMap();
if(StringUtils.isEmpty(localCache.get(key))) {
    String value = queryFromDB(key);
    localCache.put(key,value);
    return value;
}
return localCache.get(key);

这样就能构造一个非常简单的缓存。

注意:这个缓存还是有非常多的问题

没有一个清除缓存的策略,最终所有被访问过得数据都会全量给缓存起来,直到显式清除。

同时缓存没命中的情况下需要应用显式去加载(queryFromDB )。

LocalLoadingCache

好了主角要登场了,先简单介绍下这个cache的一些用法,这个cache比较好的解决了我上面提到通过Map用作缓存的两个缺陷。

用法
LoadingCache graphs = CacheBuilder.newBuilder()
    .maximumSize(10000)
    .expireAfterWrite(10, TimeUnit.MINUTES)
    .removalListener(MY_LISTENER)
    .build(
        new CacheLoader() {
          public Graph load(Key key) throws AnyException {
            return createExpensiveGraph(key);
          }
        });

通过这种方式一个缓存就已经创建好了,上面定义的load函数在缓存中不存在key对应的value的时候会去执行将数据load放到缓存中。

其底层存储采用基于数组的java.util.concurrent.atomic.AtomicReferenceArray进行缓存元素的存取。

load如何被加载

先分析下load函数是怎么被执行的:graphs.getUnchecked(new Key());从缓存中获取数据,如果没有进行put操作,首次get的时候缓存中没有其缓存值,这个时候必然要触发load函数进行value load了,那我们就从get函数进行深入分析(分析源码基于16.0.1)。

com.google.common.cache.LocalCache.Segment#get(K, int, com.google.common.cache.CacheLoader)

V get(K key, int hash, CacheLoader loader) throws ExecutionException {
      checkNotNull(key);
      checkNotNull(loader);
      try {
        if (count != 0) { // read-volatile
          // don"t call getLiveEntry, which would ignore loading values
          ReferenceEntry e = getEntry(key, hash);
          if (e != null) {
            long now = map.ticker.read();
            V value = getLiveValue(e, now);
            if (value != null) {
              recordRead(e, now);
              statsCounter.recordHits(1);
              return scheduleRefresh(e, key, hash, value, now, loader);
            }
            ValueReference valueReference = e.getValueReference();
            if (valueReference.isLoading()) {
              return waitForLoadingValue(e, key, valueReference);
            }
          }
        }

        // at this point e is either null or expired;
        return lockedGetOrLoad(key, hash, loader);
      } catch (ExecutionException ee) {
        Throwable cause = ee.getCause();
        if (cause instanceof Error) {
          throw new ExecutionError((Error) cause);
        } else if (cause instanceof RuntimeException) {
          throw new UncheckedExecutionException(cause);
        }
        throw ee;
      } finally {
        postReadCleanup();
      }
    }

首次调用会执行lockedGetOrLoad函数

V lockedGetOrLoad(K key, int hash, CacheLoader loader)
        throws ExecutionException {
      ReferenceEntry e;
      ValueReference valueReference = null;
      LoadingValueReference loadingValueReference = null;
      boolean createNewEntry = true;

      lock();
      try {
        // re-read ticker once inside the lock
        long now = map.ticker.read();
        preWriteCleanup(now);

        int newCount = this.count - 1;
        AtomicReferenceArray> table = this.table;
        int index = hash & (table.length() - 1);
        ReferenceEntry first = table.get(index);

        for (e = first; e != null; e = e.getNext()) {
          K entryKey = e.getKey();
          if (e.getHash() == hash && entryKey != null
              && map.keyEquivalence.equivalent(key, entryKey)) {
            valueReference = e.getValueReference();
            if (valueReference.isLoading()) {
              createNewEntry = false;
            } else {
              V value = valueReference.get();
              if (value == null) {
                enqueueNotification(entryKey, hash, valueReference, RemovalCause.COLLECTED);
              } else if (map.isExpired(e, now)) {
                // This is a duplicate check, as preWriteCleanup already purged expired
                // entries, but let"s accomodate an incorrect expiration queue.
                enqueueNotification(entryKey, hash, valueReference, RemovalCause.EXPIRED);
              } else {
                recordLockedRead(e, now);
                statsCounter.recordHits(1);
                // we were concurrent with loading; don"t consider refresh
                return value;
              }

              // immediately reuse invalid entries
              writeQueue.remove(e);
              accessQueue.remove(e);
              this.count = newCount; // write-volatile
            }
            break;
          }
        }

        if (createNewEntry) {
          loadingValueReference = new LoadingValueReference();

          if (e == null) {
            e = newEntry(key, hash, first);
            e.setValueReference(loadingValueReference);
            table.set(index, e);
          } else {
            e.setValueReference(loadingValueReference);
          }
        }
      } finally {
        unlock();
        postWriteCleanup();
      }

      if (createNewEntry) {
        try {
          // Synchronizes on the entry to allow failing fast when a recursive load is
          // detected. This may be circumvented when an entry is copied, but will fail fast most
          // of the time.
          synchronized (e) {
            return loadSync(key, hash, loadingValueReference, loader);
          }
        } finally {
          statsCounter.recordMisses(1);
        }
      } else {
        // The entry already exists. Wait for loading.
        return waitForLoadingValue(e, key, valueReference);
      }
    }

最后调用loadSync(key, hash, loadingValueReference, loader);进行进行数据load。

public ListenableFuture loadFuture(K key, CacheLoader loader) {
      stopwatch.start();
      V previousValue = oldValue.get();
      try {
        if (previousValue == null) {
          V newValue = loader.load(key);
          return set(newValue) ? futureValue : Futures.immediateFuture(newValue);
        }
        ListenableFuture newValue = loader.reload(key, previousValue);
        if (newValue == null) {
          return Futures.immediateFuture(null);
        }
        // To avoid a race, make sure the refreshed value is set into loadingValueReference
        // *before* returning newValue from the cache query.
        return Futures.transform(newValue, new Function() {
          @Override
          public V apply(V newValue) {
            LoadingValueReference.this.set(newValue);
            return newValue;
          }
        });
      } catch (Throwable t) {
        if (t instanceof InterruptedException) {
          Thread.currentThread().interrupt();
        }
        return setException(t) ? futureValue : fullyFailedFuture(t);
      }
    }

执行loader.load将数据load进缓存,可能你会想如果这个时候从DB或其他非内存存储中也没找到数据,这个时候LocalLoadingCache是怎么处理的呢?其实在这种情况下只需要throw异常信息就好,这样LocalLoadingCache会放弃缓存。

但是读源代码细心的你可能会发现在lockedGetOrLoad中会先newEntry后面才load

if (createNewEntry) {
      loadingValueReference = new LoadingValueReference();

      if (e == null) {
        e = newEntry(key, hash, first);
        e.setValueReference(loadingValueReference);
        table.set(index, e);
      } else {
        e.setValueReference(loadingValueReference);
      }
    } finally {
        unlock();
        postWriteCleanup();
}

  if (createNewEntry) {
    try {
      // Synchronizes on the entry to allow failing fast when a recursive load is
      // detected. This may be circumvented when an entry is copied, but will fail fast most
      // of the time.
      synchronized (e) {
        return loadSync(key, hash, loadingValueReference, loader);
      }
    } finally {
      statsCounter.recordMisses(1);
    }
  } else {
    // The entry already exists. Wait for loading.
    return waitForLoadingValue(e, key, valueReference);
  }

其实实现很简单他在cache到异常信息后又会对缓存中的entry进行remove操作,当时找这段异常被cache的代码也是找了很久时间了。

com.google.common.cache.LocalCache.Segment#getAndRecordStats

V getAndRecordStats(K key, int hash, LoadingValueReference loadingValueReference,
        ListenableFuture newValue) throws ExecutionException {
      V value = null;
      try {
        value = getUninterruptibly(newValue);
        if (value == null) {
          throw new InvalidCacheLoadException("CacheLoader returned null for key " + key + ".");
        }
        statsCounter.recordLoadSuccess(loadingValueReference.elapsedNanos());
        storeLoadedValue(key, hash, loadingValueReference, value);
        return value;
      } finally {
        if (value == null) {
          statsCounter.recordLoadException(loadingValueReference.elapsedNanos());
          removeLoadingValue(key, hash, loadingValueReference);
        }
      }
    }

执行removeLoadingValue将load异常后的key删除。

缓存策略

从用法那小结可以看到我们在创建缓存的时候除了load还有一些其他特性如下:

maximumSize(10000)
expireAfterWrite(10, TimeUnit.MINUTES)

这又是什么意思呢?这其实就是LocalLoadingCache提供的缓存策略。

maximumSize(10000) 设置缓存能保存的最多元素数量。
expireAfterWrite(10, TimeUnit.MINUTES) 设置元素在写后多久进行销毁。

其实还有maximumWeight、expireAfterAccess两种元素过期策略。

maximumSizemaximumWeight的一种特殊形式,将所有的元素设置weight为1,也即就转化为能存储元素个数的上限值了。

expireAfterAccessexpireAfterWrite基本就一个意思,只是内部用了两种不同的计数方式(通过不同的queue进行管理,被访问/修改进行入队操作)进行访问、写操作的记录。

不多说让源码说话。

根据过期时间进行缓存的淘汰策略思路:在进行get/put操作完成后对队列(每次对缓存的操作头会被其记录下来)进行一次遍历,然后按照过期时间淘汰过期的元素。

根据元素个数上限进行清理的策略思路:在load新缓存值的时候比对下是否缓存容量(元素个数)已经达到上限,如果达到上限按照LRU算法进行淘汰元素。

过期时间淘汰策略

从分析load那小结我们已经展示过get的代码,其中最后finally中有段postReadCleanup();方法,深入下去方法体就不然看出:

@GuardedBy("Segment.this")
void expireEntries(long now) {
  drainRecencyQueue();

  ReferenceEntry e;
  while ((e = writeQueue.peek()) != null && map.isExpired(e, now)) {
    if (!removeEntry(e, e.getHash(), RemovalCause.EXPIRED)) {
      throw new AssertionError();
    }
  }
  while ((e = accessQueue.peek()) != null && map.isExpired(e, now)) {
    if (!removeEntry(e, e.getHash(), RemovalCause.EXPIRED)) {
      throw new AssertionError();
    }
  }
}

进行过期key清除策略,从这段代码也能看出我为什么说expireAfterAccessexpireAfterWrite基本就一个意思了吧。

其实还有一种清除缓存的策略:基于引用的回收但是还没研究清除不便多说,这个策略清除的时机和过期时间策略一样。

@GuardedBy("Segment.this")
void drainReferenceQueues() {
  if (map.usesKeyReferences()) {
    drainKeyReferenceQueue();
  }
  if (map.usesValueReferences()) {
    drainValueReferenceQueue();
  }
}

容量回收策略

在新key对应的value load完后需要将value存放到缓存中去,插入完成后会进行容量的check如果超过容量限制会执行淘汰策略。对应源码:

com.google.common.cache.LocalCache.Segment#storeLoadedValue

boolean storeLoadedValue(K key, int hash, LoadingValueReference oldValueReference,
        V newValue) {
      lock();
      try {
        long now = map.ticker.read();
        preWriteCleanup(now);

        int newCount = this.count + 1;
        if (newCount > this.threshold) { // ensure capacity
          expand();
          newCount = this.count + 1;
        }

        AtomicReferenceArray> table = this.table;
        int index = hash & (table.length() - 1);
        ReferenceEntry first = table.get(index);

        for (ReferenceEntry e = first; e != null; e = e.getNext()) {
          K entryKey = e.getKey();
          if (e.getHash() == hash && entryKey != null
              && map.keyEquivalence.equivalent(key, entryKey)) {
            ValueReference valueReference = e.getValueReference();
            V entryValue = valueReference.get();
            // replace the old LoadingValueReference if it"s live, otherwise
            // perform a putIfAbsent
            if (oldValueReference == valueReference
                || (entryValue == null && valueReference != UNSET)) {
              ++modCount;
              if (oldValueReference.isActive()) {
                RemovalCause cause =
                    (entryValue == null) ? RemovalCause.COLLECTED : RemovalCause.REPLACED;
                enqueueNotification(key, hash, oldValueReference, cause);
                newCount--;
              }
              setValue(e, key, newValue, now);
              this.count = newCount; // write-volatile
              evictEntries();
              return true;
            }

            // the loaded value was already clobbered
            valueReference = new WeightedStrongValueReference(newValue, 0);
            enqueueNotification(key, hash, valueReference, RemovalCause.REPLACED);
            return false;
          }
        }

        ++modCount;
        ReferenceEntry newEntry = newEntry(key, hash, first);
        setValue(newEntry, key, newValue, now);
        table.set(index, newEntry);
        this.count = newCount; // write-volatile
        evictEntries();
        return true;
      } finally {
        unlock();
        postWriteCleanup();
      }
    }

上面的存储操作最终在进行setValue后会执行:

com.google.common.cache.LocalCache.Segment#evictEntries

@GuardedBy("Segment.this")
void evictEntries() {
  if (!map.evictsBySize()) {
    return;
  }

  drainRecencyQueue();
  while (totalWeight > maxSegmentWeight) {
    ReferenceEntry e = getNextEvictable();
    if (!removeEntry(e, e.getHash(), RemovalCause.SIZE)) {
      throw new AssertionError();
    }
  }
}
// TODO(fry): instead implement this with an eviction head
ReferenceEntry getNextEvictable() {
  for (ReferenceEntry e : accessQueue) {
    int weight = e.getValueReference().getWeight();
    if (weight > 0) {
      return e;
    }
  }
  throw new AssertionError();
}

这里最终会根据LRU从缓存中将最近没有使用过的元素进行剔除操作。

最后说下removalListener

在LocalLoadingCache中提供了在元素被移除的时候供应用进行回调的函数,这个函数通过removalListener进行注册,当有元素从缓存中淘汰后就会触发其进行调用。

接着上面移除元素进行分析函数removeEntry

@GuardedBy("Segment.this")
boolean removeEntry(ReferenceEntry entry, int hash, RemovalCause cause) {
  int newCount = this.count - 1;
  AtomicReferenceArray> table = this.table;
  int index = hash & (table.length() - 1);
  ReferenceEntry first = table.get(index);

  for (ReferenceEntry e = first; e != null; e = e.getNext()) {
    if (e == entry) {
      ++modCount;
      ReferenceEntry newFirst = removeValueFromChain(
          first, e, e.getKey(), hash, e.getValueReference(), cause);
      newCount = this.count - 1;
      table.set(index, newFirst);
      this.count = newCount; // write-volatile
      return true;
    }
  }

  return false;
}

最终会调用

@GuardedBy("Segment.this")
void enqueueNotification(@Nullable K key, int hash, ValueReference valueReference,
    RemovalCause cause) {
  totalWeight -= valueReference.getWeight();
  if (cause.wasEvicted()) {
    statsCounter.recordEviction();
  }
  if (map.removalNotificationQueue != DISCARDING_QUEUE) {
    V value = valueReference.get();
    RemovalNotification notification = new RemovalNotification(key, value, cause);
    map.removalNotificationQueue.offer(notification);
  }
}

将建立一个RemovalNotification队列进行保存删除元素。

在读/写完成后会进行通知

com.google.common.cache.LocalCache.Segment#postWriteCleanup

 /**
 * Performs routine cleanup following a write.
 */
void postWriteCleanup() {
  runUnlockedCleanup();
}

void cleanUp() {
  long now = map.ticker.read();
  runLockedCleanup(now);
  runUnlockedCleanup();
}

runUnlockedCleanup源码会回调com.google.common.cache.RemovalListener#onRemoval进行缓存元素删除后置处理。

void processPendingNotifications() {
    RemovalNotification notification;
    while ((notification = removalNotificationQueue.poll()) != null) {
      try {
        removalListener.onRemoval(notification);
      } catch (Throwable e) {
        logger.log(Level.WARNING, "Exception thrown by removal listener", e);
      }
    }
  }
最后类图一张


觉得图不够清晰可以点击查看大图。

总结

本篇也主要是对LocalLoadingCache从运用这个层次更向前走了一步,对我们使用过程其逻辑背后的实现进行了一定深入分析。我在初次看到这个方式也是很疑惑其底层到底是如何实现的,于是有了这篇文章,通过源码进行跟踪分析其背后的实现逻辑。

后面还会分析org.springframework.cache.guava.GuavaCacheManager如何将GuavaCache进行管理的,通过和spring更好的结合而消除显式调用cache get/put的方式。

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/67469.html

相关文章

  • GuavaCache

    摘要:执行将异常后的删除。根据元素个数上限进行清理的策略思路在新缓存值的时候比对下是否缓存容量元素个数已经达到上限,如果达到上限按照算法进行淘汰元素。在读写完成后会进行通知源码会回调进行缓存元素删除后置处理。 Google Guava LocalLoadingCache 前言 在我们编程的过程中会遇到一些在程序中需要重试使用的数据,在这种情况下我们就可以考虑利用缓存(内存)的优势来提供程序访...

    xiaowugui666 评论0 收藏0
  • Guava学习:Cache缓存入门

    摘要:并且添加了监听器,当数据被删除后会打印日志。六总结回顾缓存加载显示插入缓存回收,定时,,软弱引用,显示删除接口方法,监听器清理缓存时间只有在获取数据时才或清理缓存,使用者可以单起线程采用方法主动清理。 摘要: 学习Google内部使用的工具包Guava,在Java项目中轻松地增加缓存,提高程序获取数据的效率。 一、什么是缓存? 根据科普中国的定义,缓存就是数据交换的缓冲区(称作Cach...

    xingpingz 评论0 收藏0
  • Guava学习:Cache缓存

    摘要:并且添加了监听器,当数据被删除后会打印日志。六总结回顾缓存加载显示插入缓存回收,定时,,软弱引用,显示删除接口方法,监听器清理缓存时间只有在获取数据时才或清理缓存,使用者可以单起线程采用方法主动清理。 摘要: 学习Google内部使用的工具包Guava,在Java项目中轻松地增加缓存,提高程序获取数据的效率。一、什么是缓存?根据科普中国的定义,缓存就是数据交换的缓冲区(称作Cache)...

    PumpkinDylan 评论0 收藏0
  • springboot集成内存cache

    摘要:依赖这里使用配置配置文件配置配置文件配置使用 maven依赖 org.springframework.boot spring-boot-starter-cache com.google.guava guava 19...

    Youngdze 评论0 收藏0
  • SpringBoot手动使用EhCache

    摘要:的配置文件,使用前缀的属性进行配置。在方法的调用前并不会检查缓存,方法始终都会被调用。手动使用在实际开发过程中,存在不使用注解,需要自己添加缓存的情况。如果该属性值为,则表示对象可以无限期地存在于缓存中。 SpringBoot在annotation的层面实现了数据缓存的功能,基于Spring的AOP技术。所有的缓存配置只是在annotation层面配置,像声明式事务一样。 Spring...

    Hydrogen 评论0 收藏0

发表评论

0条评论

smartlion

|高级讲师

TA的文章

阅读更多
最新活动
阅读需要支付1元查看
<