Python3中遇到UnicodeEncodeError: ‘ascii’ codec can’t encode characters in position 25-45: ordinal not in range(128)

如果是python 2.x的话需要在文件中加上

# coding: utf-8 
reload(sys)
sys.setdefaultencoding("utf8")

但是Python3应当默认就使用utf8编码即使设置了这些也仍然不能正常打印。

最终查看了一下系统环境编码,发现问题(我的是ARM架构下的Ubuntu

>>> import sys
>>> sys.stdout.encoding  
'ANSI_X3.4-1968'

解决办法

1、设置环境变量LANG

在linux或Mac上设置环境变量的方式一样,在/etc/profile增加一行
export LANG=”en_US.UTF-8″
然后source /etc/profile

我当前系统默认已经是utf-8,采用第二种解决

2、使用PYTHONIOENCODING

在运行python命令前添加参数 PYTHONIOENCODING=utf-8 python3 api.py

该参数的解释可查看

https://docs.python.org/3.6/using/cmdline.html

使用ftplib模块,实现列出ftp上指定目录下的所有文件,包含子目录

# coding: utf-8
# for python35+

import os
import ftplib

class FtpTools(object):
    def __init__(self, host, username, password):
        self.dir_sum = 0
        self.res_sum = 0
        self.size_sum = 0
        self.host = host
        self.username = username
        self.password = password
        self.ftp = ftplib.FTP(self.host)
        self.ftp.login(self.username, self.password)
        self.ftp.encoding='utf-8'

    def filelist(self, ftp_dir):
        self.ftp.cwd(ftp_dir)
        for name, facts in self.ftp.mlsd(".",["type", "size"]):
            if facts["type"] == "dir":
                if self.ftp.pwd().endswith('/'):
                    dir_cwd = self.ftp.pwd() + name
                else:
                    dir_cwd = self.ftp.pwd() + os.sep + name
                try:
                    self.dir_sum += 1
                    self.filelist(dir_cwd)
                    self.ftp.cwd('..')
                except:
                    pass
            else:
                self.res_sum += 1
                self.size_sum += int(facts["size"])
                if self.ftp.pwd().endswith('/'):
                    res_path = self.ftp.pwd() + name
                else:
                    res_path = self.ftp.pwd() + os.sep + name
                print (res_path)
        
    def count(self):
        count_msg = f"文件夹: {self.dir_sum}, 文件数: {self.res_sum}, 总大小: {self.size_sum/1024/1024}/MB"
        print (count_msg)

    def close():
        self.ftp.quit()


if __name__ == "__main__":
    host = "192.168.1.111"
    username = "test"
    password = "test.com"

    ftptools = FtpTools(host, username, password)
    ftptools.filelist('/')
    ftptools.count()

结果

(py3) [root@mongodb ftptools]# python ftptools.py 
/bitnami-redmine-4.0.2-0-linux-x64-installer.run
/mysite-master/blogApp/admin.py
/mysite-master/blogApp/apps.py
/mysite-master/blogApp/models.py
/mysite-master/blogApp/serializers.py
/mysite-master/blogApp/tests.py
/mysite-master/blogApp/urls.py
/mysite-master/blogApp/views.py
/mysite-master/manage.py
/mysite-master/mysite/settings.py
/mysite-master/mysite/urls.py
/mysite-master/mysite/wsgi.py
/mysite-master/README.md
/mysite-master/requirments.txt
/mysite-master/同步.bat
/mysite-master/启动.bat
...
/procexp64.exe
/TeamViewer_Setup.exe
文件夹: 10, 文件数: 56, 总大小: 240.06631660461426/MB

利用ForgeryPy生成虚拟数据

在程序研发过程中,我们往往需要大量的虚拟实验数据。Python中有多个包可以用于生成虚拟数据,其中功能较为完善的是ForgeryPy。


0x01 安装

采用pip进行安装:

pip install ForgeryPy


0x02 模块

打开源码:

# -*- coding: utf-8 -*-
# Copyright (C) 2012 by Tomasz Wójcik <labs@tomekwojcik.pl>
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.

"""Easy to use generator of various forged data."""

from .forgery import address
from .forgery import basic
from .forgery import currency
from .forgery import date
from .forgery import internet
from .forgery import lorem_ipsum
from .forgery import name
from .forgery import personal

ForgeryPy包括了地理位置、日期、网络、名称等大量虚拟生成算法,非常方便我们用来生成虚拟数据。

0x03 例子

#coding:utf-8

import forgery_py

#地理信息(城市)
city=forgery_py.address.city()
#随机颜色
color=forgery_py.basic.hex_color()
#时间
data=forgery_py.date.date(True)
#电子邮箱
email=forgery_py.internet.email_address()
#姓名
name=forgery_py.name.full_name()
#公司
company=forgery_py.name.company_name()
#简介
about=forgery_py.lorem_ipsum.sentence()

print(city)
print(color)
print(data)
print(email)
print(name)
print(company)
print(about)

# 结果
Daly City
19648C
2019-03-08
sean@twimm.info
David Brown
Zoombeat
Nunc purus.

更多好玩的去看源码

python36实现批量查询指定长度的字母数字域名是否被注册

1、配合万网的域名查询接口

2、python36实现批量查询指定长度的字母数字域名是否被注册

3、使用笛卡尔积实现排列组合拼接域名

4、因为接口请求有限制,就不用多线程来速战速决了

# coding: utf-8

import time
import string
import logging
import requests
import itertools
from xml.etree import ElementTree as ET

"""
- returncode=200 表示接口返回成功 
- key=*.com表示当前check的域名 
- original=210 : Domain name is available 表示域名可以注册 
- original=211 : Domain name is not available 表示域名已经注册 
- original=212 : Domain name is invalid 表示域名参数传输错误 
- original=213 : Time out 查询超时
"""

logging.basicConfig(level=logging.INFO,
    format='%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s - %(message)s',
    datefmt='%a, %d %b %Y %H:%M:%S',
    filename='domain.log')

domain_check_api = "http://panda.www.net.cn/cgi-bin/check.cgi?area_domain={}"
headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/64.0.3282.186 Safari/537.36'}


def write_file(domain):
    with open("domain.txt", "a+") as f:
        f.write(f"{domain}\n")


def req_data(domain):
    try:
        result = requests.get(domain_check_api.format(domain), headers=headers)
        root = ET.XML(result.text)
        for node in root.iter('property'):
            code = node.find('returncode').text
            original = node.find('original').text
            if int(code) == 200 and '210' in original:
                write_file(domain)
                logging.info(f"[{domain}]可注册")
            else:
                logging.info(f"[{code}][{original}][{domain}]不可注册")
        time.sleep(0.1)
    except:
        pass


def main(sd, domain_length, domain_suffix):
    for ds in domain_suffix:
        for x in itertools.product(sd, repeat = domain_length):
            domain = f"{''.join(x)}.{ds}"
            req_data(domain)


if __name__ == "__main__":
    sd = string.digits    # 纯数字
    # sd = string.ascii_lowercase # 纯字母
    # sd = string.digits+string.ascii_lowercase # 数字字母组合
    domain_length = 4
    domain_suffix = ['cn', 'com.cn', 'com']
    main(sd, domain_length, domain_suffix)

python itertools模块实现排列组合

一、笛卡尔积:itertools.product(*iterables[, repeat])

直接对自身进行笛卡尔积:

import itertools
for i in itertools.product('ABCD', repeat=2):
    print (''.join(i),end=' ')

输出结果: 
AA AB AC AD BA BB BC BD CA CB CC CD DA DB DC DD 
print (”.join(i))这个语句可以让结果直接排列到一起 
end=”可以让默认的输出后换行变为一个空格

两个元组进行笛卡尔积:

import itertools
a = (1, 2, 3)
b = ('A', 'B', 'C')
c = itertools.product(a,b)
for i in c:
    print(i,end=' ')

输出结果: 
(1, ‘A’) (1, ‘B’) (1, ‘C’) (2, ‘A’) (2, ‘B’) (2, ‘C’) (3, ‘A’) (3, ‘B’) (3, ‘C’)

二、排列:itertools.permutations(iterable[, r])

import itertools
for i in itertools.permutations('ABCD', 2):
    print (''.join(i),end=' ')

输出结果: 
AB AC AD BA BC BD CA CB CD DA DB DC

三、组合:itertools.combinations(iterable, r)

import itertools
for i in itertools.combinations('ABCD', 3):
    print (''.join(i))

输出结果: 
ABC 
ABD 
ACD 
BCD

四、组合(包含自身重复):itertools.combinations_with_replacement(iterable, r)

import itertools
for i in itertools.combinations_with_replacement('ABCD', 3):
    print (''.join(i),end=' ')

输出结果: 
AAA AAB AAC AAD ABB ABC ABD ACC ACD ADD BBB BBC BBD BCC BCD BDD CCC CCD CDD DDD

使用python检查SSL证书到期情况

结合邮件告警和页面展示,再多的域名证书到期情况即可立马知道

# coding: utf-8 
# 查询域名证书到期情况

import re
import time
import subprocess
from datetime import datetime
from io import StringIO

def main(domain):
    f = StringIO()
    comm = f"curl -Ivs https://{domain} --connect-timeout 10"

    result = subprocess.getstatusoutput(comm)
    f.write(result[1])

    m = re.search('start date: (.*?)\n.*?expire date: (.*?)\n.*?common name: (.*?)\n.*?issuer: CN=(.*?)\n', f.getvalue(), re.S)
    start_date = m.group(1)
    expire_date = m.group(2)
    common_name = m.group(3)
    issuer = m.group(4)

    # time 字符串转时间数组
    start_date = time.strptime(start_date, "%b %d %H:%M:%S %Y GMT")
    start_date_st = time.strftime("%Y-%m-%d %H:%M:%S", start_date)
    # datetime 字符串转时间数组
    expire_date = datetime.strptime(expire_date, "%b %d %H:%M:%S %Y GMT")
    expire_date_st = datetime.strftime(expire_date,"%Y-%m-%d %H:%M:%S")

    # 剩余天数
    remaining = (expire_date-datetime.now()).days

    print ('域名:', domain)
    print ('通用名:', common_name)
    print ('开始时间:', start_date_st)
    print ('到期时间:', expire_date_st)
    print (f'剩余时间: {remaining}天')
    print ('颁发机构:', issuer)
    print ('*'*30)

    time.sleep(0.5)

if __name__ == "__main__":
    domains = ['www.01314.cn', 'www.51bbo.com'] 
    for domain in domains:
        main(domain)

结果

域名: www.01314.cn
通用名: www.01314.cn
开始时间: 2018-10-18 00:00:00
到期时间: 2019-10-18 12:00:00
剩余时间: 307天
颁发机构: Encryption Everywhere DV TLS CA - G1,OU=www.digicert.com,O=DigiCert Inc,C=US
******************************
域名: www.51bbo.com
通用名: 51bbo.com
开始时间: 2018-09-27 00:00:00
到期时间: 2019-09-27 12:00:00
剩余时间: 286天
颁发机构: Encryption Everywhere DV TLS CA - G1,OU=www.digicert.com,O=DigiCert Inc,C=US
******************************

采集soundcloud专辑

1、需要设置代理

2、滚屏加载更多歌曲

3、解析歌曲完整下载路径

import os
import time
import json
import requests
from selenium import webdriver
from scrapy.selector import Selector
from selenium.webdriver.support.ui import WebDriverWait

proxies = {
    "http": "http://192.168.1.88:1088",
    "https": "http://192.168.1.88:1088",
}

def music_download(url):
    file_name = url.split('?')[0].split('/')[-1]
    r = requests.get(url, stream=True)
    with open(f"music/{file_name}", 'wb') as f:
        f.write(r.content)
    if os.path.exists(file_name) and os.path.getsize(file_name) > 1*1000*1000:
        print(f"{file_name} download success, file size: {os.path.getsize(file_name)/1000/1000}M")
    else:
        print (f"{file_name} download fail.")

def music_index(url):
    url_desc_api = f"https://api.soundcloud.com/resolve?url={url}&client_id=LvWovRaJZlWCHql0bISuum8Bd2KX79mb"
    r = requests.get(url_desc_api, proxies=proxies)
    json_r = json.loads(r.text)
    sound_id = json_r['id']
    if not sound_id is None:
        url_download_api = f"https://api.soundcloud.com/i1/tracks/{sound_id}/streams?client_id=LvWovRaJZlWCHql0bISuum8Bd2KX79mb"
        sound_r = requests.get(url_download_api, proxies=proxies)
        json_sound_r = json.loads(sound_r.text)
        print (f"当前任务ID: {sound_id}")
        try:
            music_download_url = json_sound_r['http_mp3_128_url']
            if music_download_url:
                music_download(music_download_url)
        except:
            pass

def soundcloud_index():
    url = "https://soundcloud.com/beyond-synth"
    chromeOptions = webdriver.ChromeOptions()
    # 加入代理功能
    chromeOptions.add_argument(f"--proxy-server=http://192.168.1.88:1088")
    browser = webdriver.Chrome(chrome_options = chromeOptions)     
    browser.get(url)

    # 等待滚屏到最后
    js1 = 'return document.body.scrollHeight'
    js2 = 'window.scrollTo(0, document.body.scrollHeight)'
    old_scroll_height = 0
    while browser.execute_script(js1) >= old_scroll_height:
        old_scroll_height = browser.execute_script(js1)
        browser.execute_script(js2) 
        time.sleep(1)
    # 开始处理页面
    content = browser.find_elements_by_class_name('sound__coverArt')
    count = 1
    for c in content:
        single_url = c.get_attribute('href')
        music_index(single_url)
        time.sleep(1)
        print (f"当前第 {count} 条")
        count += 1
    # 结束任务
    browser.quit()
    
if __name__ == "__main__":
    soundcloud_index()

python pdb调试以及sublime3快捷键设置

sublime设置

sublime设置快捷键F5为运行,Ctrl+F5调试。就会对python调试方便很多。

  • Package Control中下载SublimeREPL(Read-Eval-Print-Loop)
  • 首选项 -> 按键绑定-用户 进行设置 

[
  {
    "keys": [
      "f5"
    ],
    "caption": "SublimeREPL: Python - RUN current file",
    "command": "run_existing_window_command",
    "args": {
      "id": "repl_python_run",
      "file": "config/Python/Main.sublime-menu"
    }
  },
  {
    "keys": [
      "ctrl+f5"
    ],
    "caption": "SublimeREPL: Python - PDB current file",
    "command": "run_existing_window_command",
    "args":
    {
        "id": "repl_python_pdb",
        "file": "config/Python/Main.sublime-menu"
    }
  }
]

pdb调试

c(continue): 继续执行,直到下一断点
w(where): 显示当前正在执行的代码行的上下文信息
a(args): 打印当前函数的参数列表
s(step): 执行当前代码行,并停在第一个能停的地方(相当于单步进入)
n(next): 继续执行到当前函数的下一行,或者当前行直接返回(单步跳过)
l(list): 列表,会打印出一个友好的总结,它能够显示出此刻你在代码中的位置
b(break): 动态添加断点,
q(quit): 结束

更多信息直接输入help查看

单步跳过(next)和单步进入(step)的区别在于, 单步进入会进入当前行调用的函数内部并停在里面, 而单步跳过会(几乎)全速执行完当前行调用的函数,并停在当前函数的下一行。

一个有意思的Web API框架:apistar

Github地址:https://github.com/encode/apistar

Install API Star:

$ pip3 install apistar

Create a new project in app.py:

from apistar import App, Route

def welcome(name=None):
    if name is None:
        return {'message': 'Welcome to API Star!'}
    return {'message': 'Welcome to API Star, %s!' % name}

routes = [
    Route('/', method='GET', handler=welcome),
]

app = App(routes=routes)

if __name__ == '__main__':
    app.serve('0.0.0.0', 5000, debug=True)

Run app.py

(py3) [root@localhost star]# python app.py
 * Running on http://0.0.0.0:5000/ (Press CTRL+C to quit)
 * Restarting with stat
 * Debugger is active!
 * Debugger PIN: 684-460-626

Open http://localhost:5000/docs/ in your browser:

针对无限制滚动网站的采集

模拟滚动

from selenium import webdriver
from scrapy.selector import Selector
from selenium.webdriver.support.ui import WebDriverWait
browser = webdriver.Chrome() browser.get(url)
wait = WebDriverWait(browser, 10)
wait.until(lambda dr: dr.find_element_by_class_name('project-detail').is_displayed())

# 一直滚动到最底部
js1 = 'return document.body.scrollHeight'
js2 = 'window.scrollTo(0, document.body.scrollHeight)'
old_scroll_height = 0
while browser.execute_script(js1) >= old_scroll_height:
    old_scroll_height = browser.execute_script(js1)
    browser.execute_script(js2) 
    time.sleep(1)

接下里就可以进行采集了