zhihu.topic 源代码

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import time
from datetime import datetime

from .common import *
from .base import BaseZhihu


[文档]class Topic(BaseZhihu): """答案类,请使用``ZhihuClient.topic``方法构造对象.""" @class_common_init(re_topic_url)
[文档] def __init__(self, url, name=None, session=None): """创建话题类实例. :param url: 话题url :param name: 话题名称,可选 :return: Topic """ self.url = url self._session = session self._name = name self._id = int(re_topic_url.match(self.url).group(1))
@property def id(self): """获取话题Id(网址最后那串数字) :return: 话题Id :rtype: int """ return self._id @property @check_soup('_xsrf') def xsrf(self): """获取知乎的反xsrf参数(用不到就忽视吧~) :return: xsrf参数 :rtype: str """ return self.soup.find('input', attrs={'name': '_xsrf'})['value'] @property @check_soup('_tid') def tid(self): """话题内部Id,有时候要用到 :return: 话题内部Id :rtype: int """ return int(self.soup.find( 'div', id='zh-topic-desc')['data-resourceid']) @property @check_soup('_name') def name(self): """获取话题名称. :return: 话题名称 :rtype: str """ return self.soup.find('h1').text @property def parents(self): """获取此话题的父话题。 注意:由于没找到有很多父话题的话题来测试, 所以本方法可能再某些时候出现问题,请不吝反馈。 :return: 此话题的父话题,返回生成器 :rtype: Topic.Iterable """ self._make_soup() parent_topic_tag = self.soup.find('div', class_='parent-topic') if parent_topic_tag is None: yield [] else: for topic_tag in parent_topic_tag.find_all('a'): yield Topic(Zhihu_URL + topic_tag['href'], topic_tag.text.strip(), session=self._session) @property def children(self): """获取此话题的子话题 :return: 此话题的子话题, 返回生成器 :rtype: Topic.Iterable """ self._make_soup() child_topic_tag = self.soup.find('div', class_='child-topic') if child_topic_tag is None: return [] elif '共有' not in child_topic_tag.contents[-2].text: for topic_tag in child_topic_tag.div.find_all('a'): yield Topic(Zhihu_URL + topic_tag['href'], topic_tag.text.strip(), session=self._session) else: flag = 'load' child = '' data = {'_xsrf': self.xsrf} params = { 'parent': self.id } while flag == 'load': params['child'] = child res = self._session.post(Topic_Get_Children_Url, params=params, data=data) j = map(lambda x: x[0], res.json()['msg'][1]) *topics, last = j for topic in topics: yield Topic(Zhihu_URL + '/topic/' + topic[2], topic[1], session=self._session) flag = last[0] child = last[2] if flag == 'topic': yield Topic(Zhihu_URL + '/topic/' + last[2], last[1], session=self._session) @property @check_soup('_follower_num') def follower_num(self): """获取话题关注人数. :return: 关注人数 :rtype: int """ follower_num_block = self.soup.find( 'div', class_='zm-topic-side-followers-info') # 无人关注时 找不到对应block,直接返回0 (感谢知乎用户 段晓晨 提出此问题) if follower_num_block.strong is None: return 0 return int(follower_num_block.strong.text) @property def followers(self): """获取话题关注者 :return: 话题关注者,返回生成器 :rtype: Author.Iterable """ from .author import Author, ANONYMOUS self._make_soup() gotten_data_num = 20 data = { '_xsrf': self.xsrf, 'start': '', 'offset': 0 } while gotten_data_num == 20: res = self._session.post( Topic_Get_More_Follower_Url.format(self.id), data=data) j = res.json()['msg'] gotten_data_num = j[0] data['offset'] += gotten_data_num soup = BeautifulSoup(j[1]) divs = soup.find_all('div', class_='zm-person-item') for div in divs: h2 = div.h2 url = Zhihu_URL + h2.a['href'] name = h2.a.text motto = h2.parent.div.text.strip() try: yield Author(url, name, motto, session=self._session) except ValueError: # invalid url yield ANONYMOUS data['start'] = int(re_get_number.match(divs[-1]['id']).group(1)) @property @check_soup('_photo_url') def photo_url(self): """获取话题头像图片地址. :return: 话题头像url :rtype: str """ img = self.soup.find('a', id='zh-avartar-edit-form').img['src'] return img.replace('_m', '_r') @property @check_soup('_description') def description(self): """获取话题描述信息. :return: 话题描述信息 :rtype: str """ desc = self.soup.find('div', class_='zm-editable-content').text return desc @property def top_authors(self): """获取最佳回答者 :return: 此话题下最佳回答者,一般来说是5个,要不就没有,返回生成器 :rtype: Author.Iterable """ from .author import Author, ANONYMOUS self._make_soup() t = self.soup.find('div', id='zh-topic-top-answerer') if t is None: return for d in t.find_all('div', class_='zm-topic-side-person-item-content'): url = Zhihu_URL + d.a['href'] name = d.a.text motto = d.find('span', class_='bio')['title'] try: yield Author(url, name, motto, session=self._session) except ValueError: # invalid url yield ANONYMOUS @property def top_answers(self): """获取话题下的精华答案. :return: 话题下的精华答案,返回生成器. :rtype: Answer.Iterable """ from .question import Question from .answer import Answer from .author import Author, ANONYMOUS top_answers_url = Topic_Top_Answers_Url.format(self.id) params = {'page': 1} while True: # 超出50页直接返回 if params['page'] > 50: return res = self._session.get(top_answers_url, params=params) params['page'] += 1 soup = BeautifulSoup(res.content) # 不够50页,来到错误页面 返回 if soup.find('div', class_='error') is not None: return questions = soup.find_all('a', class_='question_link') answers = soup.find_all('a', class_='answer-date-link') authors = soup.find_all('div', class_='zm-item-answer-author-info') upvotes = soup.find_all('a', class_='zm-item-vote-count') for ans, up, q, au in zip(answers, upvotes, questions, authors): answer_url = Zhihu_URL + ans['href'] question_url = Zhihu_URL + q['href'] question_title = q.text.strip() upvote = up.text if upvote.isdigit(): upvote = int(upvote) else: upvote = None question = Question(question_url, question_title, session=self._session) if au.a is None: author = ANONYMOUS else: author_url = Zhihu_URL + au.a['href'] author_name = au.a.text author_motto = au.strong['title'] if au.strong else '' author = Author(author_url, author_name, author_motto, session=self._session) yield Answer(answer_url, question, author, upvote, session=self._session) @property def questions(self): """获取话题下的所有问题(按时间降序排列) :return: 话题下所有问题,返回生成器 :rtype: Question.Iterable """ from .question import Question question_url = Topic_Questions_Url.format(self.id) params = {'page': 1} older_time_stamp = int(time.time()) * 1000 while True: res = self._session.get(question_url, params=params) soup = BeautifulSoup(res.content) if soup.find('div', class_='error') is not None: return questions = soup.find_all('div', class_='question-item') questions = list(filter( lambda x: int(x.h2.span['data-timestamp']) < older_time_stamp, questions)) for qu_div in questions: url = Zhihu_URL + qu_div.h2.a['href'] title = qu_div.h2.a.text.strip() creation_time = datetime.fromtimestamp( int(qu_div.h2.span['data-timestamp']) // 1000) yield Question(url, title, creation_time=creation_time, session=self._session) older_time_stamp = int(questions[-1].h2.span['data-timestamp']) params['page'] += 1 @property def unanswered_questions(self): """获取话题下的等待回答的问题 什么是「等待回答」的问题:https://www.zhihu.com/question/40470324 :return: 话题下等待回答的问题,返回生成器 :rtype: Question.Iterable """ from .question import Question question_url = Topic_Unanswered_Question_Url.format(self.id) params = {'page': 1} while True: res = self._session.get(question_url, params=params) soup = BeautifulSoup(res.content) if soup.find('div', class_='error') is not None: return questions = soup.find_all('div', class_='question-item') for qu_div in questions: url = Zhihu_URL + qu_div.h2.a['href'] title = qu_div.h2.a.text.strip() yield Question(url, title, session=self._session) params['page'] += 1 @property def answers(self): """获取话题下所有答案(按时间降序排列) :return: 话题下所有答案,返回生成器 :rtype: Answer.Iterable """ from .question import Question from .answer import Answer from .author import Author, ANONYMOUS newest_url = Topic_Newest_Url.format(self.id) params = {'start': 0, '_xsrf': self.xsrf} res = self._session.get(newest_url) soup = BeautifulSoup(res.content) while True: divs = soup.find_all('div', class_='folding') # 如果话题下无答案,则直接返回 if len(divs) == 0: return last_score = divs[-1]['data-score'] for div in divs: q = div.find('a', class_="question_link") question_url = Zhihu_URL + q['href'] question_title = q.text.strip() question = Question(question_url, question_title, session=self._session) ans = div.find('a', class_='answer-date-link') answer_url = Zhihu_URL + ans['href'] upvote = div.find('a', class_='zm-item-vote-count').text if upvote.isdigit(): upvote = int(upvote) else: upvote = None au = div.find('div', class_='zm-item-answer-author-info') if au.a is None: author = ANONYMOUS else: author_url = Zhihu_URL + au.a['href'] author_name = au.a.text author_motto = au.strong['title'] if au.strong else '' author = Author(author_url, author_name, author_motto, session=self._session) yield Answer(answer_url, question, author, upvote, session=self._session) params['offset'] = last_score res = self._session.post(newest_url, data=params) gotten_feed_num = res.json()['msg'][0] # 如果得到内容数量为0则返回 if gotten_feed_num == 0: return soup = BeautifulSoup(res.json()['msg'][1]) @property def hot_questions(self): """获取话题下热门的问题 :return: 话题下的热门动态中的问题,按热门度顺序返回生成器 :rtype: Question.Iterable """ from .question import Question hot_questions_url = Topic_Hot_Questions_Url.format(self.id) params = {'start': 0, '_xsrf': self.xsrf} res = self._session.get(hot_questions_url) soup = BeautifulSoup(res.content) while True: questions_duplicate = soup.find_all('a', class_='question_link') # 如果话题下无问题,则直接返回 if len(questions_duplicate) == 0: return # 去除重复的问题 questions = list(set(questions_duplicate)) questions.sort(key=self._get_score, reverse=True) last_score = soup.find_all( 'div', class_='feed-item')[-1]['data-score'] for q in questions: question_url = Zhihu_URL + q['href'] question_title = q.text.strip() question = Question(question_url, question_title, session=self._session) yield question params['offset'] = last_score res = self._session.post(hot_questions_url, data=params) gotten_feed_num = res.json()['msg'][0] # 如果得到问题数量为0则返回 if gotten_feed_num == 0: return soup = BeautifulSoup(res.json()['msg'][1]) @property def hot_answers(self): """获取话题下热门的回答 :return: 话题下的热门动态中的回答,按热门度顺序返回生成器 :rtype: Question.Iterable """ from .question import Question from .author import Author from .answer import Answer hot_questions_url = Topic_Hot_Questions_Url.format(self.id) params = {'start': 0, '_xsrf': self.xsrf} res = self._session.get(hot_questions_url) soup = BeautifulSoup(res.content) while True: answers_div = soup.find_all('div', class_='feed-item') last_score = answers_div[-1]['data-score'] for div in answers_div: # 没有 text area 的情况是:答案被和谐。 if not div.textarea: continue question_url = Zhihu_URL + div.h2.a['href'] question_title = div.h2.a.text.strip() question = Question(question_url, question_title, session=self._session) author_link = div.find('a', class_='author-link') if not author_link: author_url = None author_name = '匿名用户' author_motto = '' else: author_url = Zhihu_URL + author_link['href'] author_name = author_link.text author_motto_span = div.find('span', class_='bio') author_motto = author_motto_span['title'] \ if author_motto_span else '' author = Author(author_url, author_name, author_motto, session=self._session) body = div.find('div', class_='zm-item-rich-text') answer_url = Zhihu_URL + body['data-entry-url'] upvote_num = int(div.find( 'div', class_='zm-item-vote-info')['data-votecount']) yield Answer(answer_url, question, author, upvote_num, session=self._session) params['offset'] = last_score res = self._session.post(hot_questions_url, data=params) gotten_feed_num = res.json()['msg'][0] # 如果得到问题数量为0则返回 if gotten_feed_num == 0: return soup = BeautifulSoup(res.json()['msg'][1]) @staticmethod def _get_score(tag): h2 = tag.parent div = h2.parent try: _ = h2['class'] return div['data-score'] except KeyError: return div.parent.parent['data-score']