资讯专栏INFORMATION COLUMN

爬虫框架WebMagic源码分析之Scheduler

TIGERB / 1968人阅读

摘要:包主要实现类,这是一个抽象类,实现了通用的模板方法,并在方法内部判断错误重试去重处理等。重置重复检查就是清空,获取请求总数也就是获取的。至于请求总数统计,就是返回中维护的的大小。

Scheduler是Webmagic中的url调度器,负责从Spider处理收集(push)需要抓取的url(Page的targetRequests)、并poll出将要被处理的url给Spider,同时还负责对url判断是否进行错误重试、及去重处理、以及总页面数、剩余页面数统计等。

主要接口:

Scheduler,定义了基本的push和poll方法。基本接口。

MonitorableScheduler,继承自Scheduler的接口,定义了获取剩余url请求数和总请求数的方法。便于监控。

core包主要实现类:

DuplicateRemovedScheduler,这是一个抽象类,实现了通用的push模板方法,并在push方法内部判断错误重试、去重处理等。去重策略采用的是HashSetDuplicateRemover类,这个会在稍后说明。

PriorityScheduler,内置两个优先级队列(+,-)和一个非优先级阻塞队列的调度器。

QueueScheduler,内置一个阻塞队列的调度器。这是默认采用的。

URL去重策略:

DuplicateRemover:去重接口,含有判断是否重复,重置重复检查,获取请求总数的方法。

HashSetDuplicateRemover:DuplicateRemover的实现类,内部维护了一个并发安全的HashSet。

先说下去重策略的具体实现。核心代码如下:

public class HashSetDuplicateRemover implements DuplicateRemover {
    private Set urls = Collections.newSetFromMap(new ConcurrentHashMap());
    @Override
    public boolean isDuplicate(Request request, Task task) {
        return !urls.add(getUrl(request));
    }
    。。。
    @Override
    public void resetDuplicateCheck(Task task) {
        urls.clear();
    }
    @Override
    public int getTotalRequestsCount(Task task) {
        return urls.size();
    }
}

去重策略类很简单,就是维护一个并发安全的HashSet。然后通过add方法是否成功来判断是否是重复的url。重置重复检查就是清空set,获取请求总数也就是获取set的size。简单明了。但是你以为去重就这么点,那么你错了。继续看。

public abstract class DuplicateRemovedScheduler implements Scheduler {
    private DuplicateRemover duplicatedRemover = new HashSetDuplicateRemover();
    @Override
    public void push(Request request, Task task) {
        if (shouldReserved(request) || noNeedToRemoveDuplicate(request) || !duplicatedRemover.isDuplicate(request, task)) {
            pushWhenNoDuplicate(request, task);
        }
    }
    protected boolean shouldReserved(Request request) {
        return request.getExtra(Request.CYCLE_TRIED_TIMES) != null;
    }

    protected boolean noNeedToRemoveDuplicate(Request request) {
        return HttpConstant.Method.POST.equalsIgnoreCase(request.getMethod());
    }
    protected void pushWhenNoDuplicate(Request request, Task task) {

    }
}

DuplicateRemovedScheduler是一个抽象类,提供了push的通用模板,并为子类提供了pushWhenNoDuplicate用于实现自己的策略。push方法用于同一处理去重和重试机制。
首先判断是否需要进行错误重试,如果需要,那么就直接push到队列中,否则判断请求是否为POST方法,如果是直接加入队列,(这里需要注意的是,POST请求的url不会被加入HashSetDuplicateRemover维护的urls集合,故而也不会被加入到最终的getTotalRequestsCount的统计中,所以最终我们获取的统计信息只是针对GET请求的。),否则进行去重判断。
根据不同调度器的实现,pushWhenNoDuplicate的实现方式不一样。
在PriorityScheduler中内置两个优先级队列(+,-)和一个非优先级阻塞队列的调度器,其pushWhenNoDuplicate代码如下:

public void pushWhenNoDuplicate(Request request, Task task) {
    if (request.getPriority() == 0) {
        noPriorityQueue.add(request);
    } else if (request.getPriority() > 0) {
        priorityQueuePlus.put(request);
    } else {
        priorityQueueMinus.put(request);
    }
}

根据Request是否设置priority属性,以及是否为正、负来决定加入到哪个队列中。因为这影响了后续poll的先后顺序。

在QueueScheduler中内置一个阻塞队列的调度器。其pushWhenNoDuplicate代码如下:

    public void pushWhenNoDuplicate(Request request, Task task) {
        queue.add(request);
    }

就是简单地将其加入队列中。

以上就是关于URL去重及push的机制,接下来说明poll思路:

在PriorityScheduler中,poll顺序为plus队列>noPriority队列>minus队列。

public synchronized Request poll(Task task) {
        Request poll = priorityQueuePlus.poll();
        if (poll != null) {
            return poll;
        }
        poll = noPriorityQueue.poll();
        if (poll != null) {
            return poll;
        }
        return priorityQueueMinus.poll();
    }

在QueueScheduler中,简单粗暴。

public Request poll(Task task) {
        return queue.poll();
    }

至于url请求总数统计,就是返回HashSetDuplicateRemover中维护的urls set的大小。这里再次罗嗦一次:最终我们获取的统计信息只是针对GET请求的。

public int getTotalRequestsCount(Task task) {
        return getDuplicateRemover().getTotalRequestsCount(task);
    }

当然extensions扩展模块中还有些Scheduler实现,比如RedisScheduler用作集群支持,FileCacheQueueScheduler用来断点续爬支持等。由于本系列文章是先分析核心包,后续分析扩展包,所以关于这部分,后续补充。

RedisScheduler
思路是采用set来存储已经抓取过的url,list来存储待抓url队列,hash来存储序列化数据(哈希中的键为url的SHA值,值为Request的json序列化字符串)。所有数据类型的键都是基于Spider的UUID来生成的,也就是说每个Spider实例所拥有的都是不同的。

 @Override
    public boolean isDuplicate(Request request, Task task) {
        Jedis jedis = pool.getResource();
        try {
            return jedis.sadd(getSetKey(task), request.getUrl()) > 0;
        } finally {
            pool.returnResource(jedis);
        }

    }

    @Override
    protected void pushWhenNoDuplicate(Request request, Task task) {
        Jedis jedis = pool.getResource();
        try {
            jedis.rpush(getQueueKey(task), request.getUrl());
            if (request.getExtras() != null) {
                String field = DigestUtils.shaHex(request.getUrl());
                String value = JSON.toJSONString(request);
                jedis.hset((ITEM_PREFIX + task.getUUID()), field, value);
            }
        } finally {
            pool.returnResource(jedis);
        }
    }

    @Override
    public synchronized Request poll(Task task) {
        Jedis jedis = pool.getResource();
        try {
            String url = jedis.lpop(getQueueKey(task));
            if (url == null) {
                return null;
            }
            String key = ITEM_PREFIX + task.getUUID();
            String field = DigestUtils.shaHex(url);
            byte[] bytes = jedis.hget(key.getBytes(), field.getBytes());
            if (bytes != null) {
                Request o = JSON.parseObject(new String(bytes), Request.class);
                return o;
            }
                Request request = new Request(url);
            return request;
        } finally {
            pool.returnResource(jedis);
        }
    }
 @Override
    public int getLeftRequestsCount(Task task) {
        Jedis jedis = pool.getResource();
        try {
            Long size = jedis.llen(getQueueKey(task));
            return size.intValue();
        } finally {
            pool.returnResource(jedis);
        }
    }

    @Override
    public int getTotalRequestsCount(Task task) {
        Jedis jedis = pool.getResource();
        try {
            Long size = jedis.scard(getSetKey(task));
            return size.intValue();
        } finally {
            pool.returnResource(jedis);
        }
    }

这些代码都很好理解,只要有点redis基础的都没问题,这里就不再赘述了。

至于RedisPriorityScheduler就是采用有序的zset来存储plus、min队列,list来存储noprioprity队列。

FileCacheQueueScheduler
思路是维护两个文件.cursor.txt,.urls.txt 前者由于存储一个数字,这个数字代表了读取.urls.txt的行数。后者用来存储所有的urls。初始化时从两个文件读取内存中,并初始化urls集和queue队列、同时初始化flush线程定时flush内容到文件中。当poll和pushWhenNoDuplicate时和原来逻辑差不多,只不过加了写文件的步骤。
需要注意的是:FileCacheQueueScheduler实现了自己的去重规则,而不是直接使用DuplicateRemovedScheduler父类的去重规则。不过原理都一样,都是通过Set来去重。

以上就是关于调度器的部分,下篇主题待定。

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/66885.html

相关文章

  • 爬虫框架Webmagic源码分析Spider

    摘要:获取正在运行的线程数,用于状态监控。之后初始化组件主要是初始化线程池将到中,初始化开始时间等。如果线程池中运行线程数量为,并且默认,那么就停止退出,结束爬虫。 本系列文章,针对Webmagic 0.6.1版本 一个普通爬虫启动代码 public static void main(String[] args) { Spider.create(new GithubRepoPageP...

    邹立鹏 评论0 收藏0
  • 爬虫框架WebMagic源码分析系列目录

    摘要:爬虫框架源码分析之爬虫框架源码分析之爬虫框架源码分析之爬虫框架源码分析之爬虫框架源码分析之之进阶 爬虫框架Webmagic源码分析之Spider爬虫框架WebMagic源码分析之Scheduler爬虫框架WebMagic源码分析之Downloader爬虫框架WebMagic源码分析之Selector爬虫框架WebMagic源码分析之SeleniumWebMagic之Spider进阶

    wayneli 评论0 收藏0
  • WebMagicSpider进阶

    摘要:实际运行中就发现了一个有趣的现象。爬虫抓取的速度超过了我用给它推送的速度,导致爬虫从获取不到同时此刻线程池所有线程都已停止。如何管理设置,避免返回,且没有工作线程时退出循环。退出检测循环说明结束了,手动调用来是退出调度循环,终止爬虫。 Webmagic源码分析系列文章,请看这里 从解决问题开始吧。 问题描述:由于数据库的数据量特别大,而且公司没有搞主从读写分离,导致从数据库读取数据比较...

    Zhuxy 评论0 收藏0
  • 爬虫框架WebMagic源码分析Selector

    摘要:主要用于选择器抽象类,实现类前面说的两个接口,主要用于选择器继承。多个选择的情形,每个选择器各自独立选择,将所有结果合并。抽象类,定义了一些模板方法。这部分源码就不做分析了。这里需要提到的一点是返回的不支持选择,返回的对象支持选择。 1、Selector部分:接口:Selector:定义了根据字符串选择单个元素和选择多个元素的方法。ElementSelector:定义了根据jsoup ...

    dongxiawu 评论0 收藏0
  • 爬虫框架WebMagic源码分析Selenium

    摘要:有一个模块其中实现了一个。但是感觉灵活性不大。接口如下它会获得一个实例,你可以在里面进行任意的操作。本部分到此结束。 webmagic有一个selenium模块,其中实现了一个SeleniumDownloader。但是感觉灵活性不大。所以我就自己参考实现了一个。 首先是WebDriverPool用来管理WebDriver池: import java.util.ArrayList; im...

    MarvinZhang 评论0 收藏0

发表评论

0条评论

TIGERB

|高级讲师

TA的文章

阅读更多
最新活动
阅读需要支付1元查看
<