图片urls爬虫 使用selenium从第三方那个网站动态爬取ins图片urls """ 从第三方网站爬取ins用户图片的urls Data: 8/2020 Author: Yu """ import requests import time import json import random import os from pyquery import PyQuery as pq from selenium import webdriver from lxml import etree import threading import re import hashlib from lxml import html from html.parser import HTMLParser from selenium.webdriver.common.by import By from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC from selenium.webdriver.common.keys import Keys from selenium.webdriver.common.action_chains import ActionChains from fake_useragent import UserAgent base_url = 'https://imginn.com/' uri = 'https://www.instagram.com/graphql/query/?query_hash=15bf78a4ad24e33cbd838fdb31353ac1&variables=%7B%22id%22%3A%22{user_id}%22%2C%22first%22%3A12%2C%22after%22%3A%22{cursor}%22%7D' proxy = { 'http': 'http://127.0.0.1:1080', 'socks': 'socks://127.0.0.1:1088', 'https': 'http://127.0.0.1:1080' } shortcode_save_path = '/media/hhc/ssd/ins_ids/shortcode' display_url_save_path = '/media/hhc/ssd/ins_ids/urls/' threads = [] initThreadsName = [] class Crawler_Shortcode(threading.Thread): def __init__(self, spiderID): threading.Thread.__init__(self) self.spiderID = spiderID def run(self): if not os.path.exists("/media/hhc/ssd/ins_ids/ins_20200728_ids/"): os.makedirs("/media/hhc/ssd/ins_ids/ins_20200728_ids/") print("thread " + str(self.spiderID) + " start!") f = open(f'/media/hhc/ssd/ins_ids/ins_20200728_ids/ins_ids_{self.spiderID}.txt', 'r', encoding="utf-8") error_users = [] if os.path.exists(f'/media/hhc/ssd/ins_ids/ins_20200728_ids/error_user_{self.spiderID}.txt'): g = open(f'/media/hhc/ssd/ins_ids/ins_20200728_ids/error_user_{self.spiderID}.txt', 'r', encoding='utf-8') error_users = g.read().splitlines() g.close() driver = webdriver.Firefox() # chrome_options=option) driver.get(base_url) count = 0 for user_name in f.readlines(): samples = [] user_name = user_name.strip('\n') print(f'username:{self.spiderID} {user_name}') # 检查用户名是否已下载/空用户名/访问受限 if (os.path.exists(f"{display_url_save_path}{user_name}_image_url.txt")) or (not user_name) or (user_name in error_users): continue count += 1 time.sleep(3) if count % 25 == 0: time.sleep(300 + float(random.randint(1, 1000)) / 100) samples = self.get_urls(driver, user_name, times=1) # 该用户无图片 if samples == 0 or not samples: self.write_txt(f'/media/hhc/ssd/ins_ids/ins_20200728_ids/error_user_{self.spiderID}.txt', user_name) continue # 只对图片数量大于25张的用户进行urls存储 if len(samples) > 25: threadLock.acquire() with open(f"{display_url_save_path}{user_name}_image_url.txt", "a", encoding='utf-8') as f: for index, sample in enumerate(samples): sample = sample.strip('&dl=1') # print(sample) f.write(sample + '\n') print("Thread: " + str(self.spiderID) + " current index: " + str(index) + " " + user_name + " done!!>_<!!") threadLock.release() else: self.write_txt(f'/media/hhc/ssd/ins_ids/ins_20200728_ids/error_user_{self.spiderID}.txt', user_name) continue print("thread " + str(self.spiderID) + " done!") # 动态进入用户界面 def get_urls(self, driver, user_name, times): try: samples = [] driver.find_element_by_name("q").clear() driver.find_element_by_name("q").send_keys(user_name) time.sleep(1 + float(random.randint(1, 300)) / 100) driver.find_element_by_xpath('/html/body/div[1]/div/div[2]/form/button').click() time.sleep(6 + float(random.randint(1, 300)) / 100) # 检查该用户是否存在 html = driver.page_source if re.findall('USERS [(]0[)]', html, re.S): # self.write_txt(f'/media/hhc/ssd/ins_ids/ins_20200728_ids/error_user_{self.spiderID}.txt', user_name) return 0 # 等待搜索结果出现 WebDriverWait(driver, 30, 0.5).until(EC.presence_of_element_located((By.XPATH, '/html/body/div[2]/div[3]/div[1]/a[1]'))) driver.find_element_by_xpath('/html/body/div[2]/div[3]/div[1]/a[1]').click() # 等待用户界面 time.sleep(5 + float(random.randint(1, 200)) / 100) # 检查是否有权限访问该用户 html = driver.page_source if re.findall('You visit a private account', html, re.S) or re.findall('Page Not Found', html, re.S): return 0 WebDriverWait(driver, 10, 0.5).until(EC.presence_of_element_located((By.XPATH, '/html/body/div[2]/div[2]/div[1]/img'))) # 动态模拟网页滑动,获取更多图片 for i in range(8): ActionChains(driver).send_keys(Keys.PAGE_DOWN).perform() time.sleep(3 + float(random.randint(1, 100)) / 100) # 从html文件 图片urls获取 html = etree.HTML(driver.page_source) samples = self.img_urls(html) time.sleep(20 + float(random.randint(1, 100))/100) return samples except Exception as e: trytimes = 3 print(str(self.spiderID) + " : " + str(e)) time.sleep(100 + float(random.randint(1, 1000)) / 100) times += 1 # 若try三次后依然无法访问,重启界面 if times > trytimes: driver.close() self.run() else: self.get_urls(driver, user_name, times) # 从用户界面获取图片urls def img_urls(self, html): # flag = html.xpath('//div[@class="item"]/div[@class="img"]/') urls = html.xpath('//div[@class="item"]/div[@class="action"]/a/@href') img_urls = [] for i in range(len(urls)): if 'mp4' in urls[i]: continue img_urls.append(urls[i]) return img_urls def write_txt(self, path, id): with open(path, 'a') as f: f.write(id + '\n') def get_html(self, url): try: response = requests.get(url, headers=headers, proxies=proxy) if response.status_code == 200: return response.text else: print('请求错误状态码:', response.status_code) except Exception as e: print(e) time.sleep(2) return None def get_json(self, headers, url): try: time.sleep(30 + float(random.randint(1, 1000)) / 100) response = requests.get(url, headers=headers, proxies=proxy) if response.status_code == 200: return response.json() else: print('请求网页json错误, 错误状态码:', response.status_code) time.sleep(20 + float(random.randint(1, 4000)) / 100) return self.get_json(headers, url) except Exception as e: print(e) time.sleep(20 + float(random.randint(1, 4000)) / 100) return self.get_json(headers, url) def main(i): crawler = Crawler_Shortcode(i) threads.append(crawler) return threads if __name__ == '__main__': files_num = 10 thread_num = 10 threadLock = threading.Lock() for i in range(thread_num): threads = main(i) # crawler = Crawler_Shortcode(i) # threads.append(crawler) for t in threads: time.sleep(float(random.randint(1, 500)) / 100) t.setDaemon(True) t.start() for t in threads: t.join()