Python3中遇到UnicodeEncodeError: ‘ascii’ codec can’t encode characters in position 25-45: ordinal not in range(128)

如果是python 2.x的话需要在文件中加上

# coding: utf-8 
reload(sys)
sys.setdefaultencoding("utf8")

但是Python3应当默认就使用utf8编码即使设置了这些也仍然不能正常打印。

最终查看了一下系统环境编码,发现问题(我的是ARM架构下的Ubuntu

>>> import sys
>>> sys.stdout.encoding  
'ANSI_X3.4-1968'

解决办法

1、设置环境变量LANG

在linux或Mac上设置环境变量的方式一样,在/etc/profile增加一行
export LANG=”en_US.UTF-8″
然后source /etc/profile

我当前系统默认已经是utf-8,采用第二种解决

2、使用PYTHONIOENCODING

在运行python命令前添加参数 PYTHONIOENCODING=utf-8 python3 api.py

该参数的解释可查看

https://docs.python.org/3.6/using/cmdline.html

使用ftplib模块,实现列出ftp上指定目录下的所有文件,包含子目录

# coding: utf-8
# for python35+

import os
import ftplib

class FtpTools(object):
    def __init__(self, host, username, password):
        self.dir_sum = 0
        self.res_sum = 0
        self.size_sum = 0
        self.host = host
        self.username = username
        self.password = password
        self.ftp = ftplib.FTP(self.host)
        self.ftp.login(self.username, self.password)
        self.ftp.encoding='utf-8'

    def filelist(self, ftp_dir):
        self.ftp.cwd(ftp_dir)
        for name, facts in self.ftp.mlsd(".",["type", "size"]):
            if facts["type"] == "dir":
                if self.ftp.pwd().endswith('/'):
                    dir_cwd = self.ftp.pwd() + name
                else:
                    dir_cwd = self.ftp.pwd() + os.sep + name
                try:
                    self.dir_sum += 1
                    self.filelist(dir_cwd)
                    self.ftp.cwd('..')
                except:
                    pass
            else:
                self.res_sum += 1
                self.size_sum += int(facts["size"])
                if self.ftp.pwd().endswith('/'):
                    res_path = self.ftp.pwd() + name
                else:
                    res_path = self.ftp.pwd() + os.sep + name
                print (res_path)
        
    def count(self):
        count_msg = f"文件夹: {self.dir_sum}, 文件数: {self.res_sum}, 总大小: {self.size_sum/1024/1024}/MB"
        print (count_msg)

    def close():
        self.ftp.quit()


if __name__ == "__main__":
    host = "192.168.1.111"
    username = "test"
    password = "test.com"

    ftptools = FtpTools(host, username, password)
    ftptools.filelist('/')
    ftptools.count()

结果

(py3) [root@mongodb ftptools]# python ftptools.py 
/bitnami-redmine-4.0.2-0-linux-x64-installer.run
/mysite-master/blogApp/admin.py
/mysite-master/blogApp/apps.py
/mysite-master/blogApp/models.py
/mysite-master/blogApp/serializers.py
/mysite-master/blogApp/tests.py
/mysite-master/blogApp/urls.py
/mysite-master/blogApp/views.py
/mysite-master/manage.py
/mysite-master/mysite/settings.py
/mysite-master/mysite/urls.py
/mysite-master/mysite/wsgi.py
/mysite-master/README.md
/mysite-master/requirments.txt
/mysite-master/同步.bat
/mysite-master/启动.bat
...
/procexp64.exe
/TeamViewer_Setup.exe
文件夹: 10, 文件数: 56, 总大小: 240.06631660461426/MB

利用ForgeryPy生成虚拟数据

在程序研发过程中,我们往往需要大量的虚拟实验数据。Python中有多个包可以用于生成虚拟数据,其中功能较为完善的是ForgeryPy。


0x01 安装

采用pip进行安装:

pip install ForgeryPy


0x02 模块

打开源码:

# -*- coding: utf-8 -*-
# Copyright (C) 2012 by Tomasz Wójcik <labs@tomekwojcik.pl>
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.

"""Easy to use generator of various forged data."""

from .forgery import address
from .forgery import basic
from .forgery import currency
from .forgery import date
from .forgery import internet
from .forgery import lorem_ipsum
from .forgery import name
from .forgery import personal

ForgeryPy包括了地理位置、日期、网络、名称等大量虚拟生成算法,非常方便我们用来生成虚拟数据。

0x03 例子

#coding:utf-8

import forgery_py

#地理信息(城市)
city=forgery_py.address.city()
#随机颜色
color=forgery_py.basic.hex_color()
#时间
data=forgery_py.date.date(True)
#电子邮箱
email=forgery_py.internet.email_address()
#姓名
name=forgery_py.name.full_name()
#公司
company=forgery_py.name.company_name()
#简介
about=forgery_py.lorem_ipsum.sentence()

print(city)
print(color)
print(data)
print(email)
print(name)
print(company)
print(about)

# 结果
Daly City
19648C
2019-03-08
sean@twimm.info
David Brown
Zoombeat
Nunc purus.

更多好玩的去看源码

python36实现批量查询指定长度的字母数字域名是否被注册

1、配合万网的域名查询接口

2、python36实现批量查询指定长度的字母数字域名是否被注册

3、使用笛卡尔积实现排列组合拼接域名

4、因为接口请求有限制,就不用多线程来速战速决了

# coding: utf-8

import time
import string
import logging
import requests
import itertools
from xml.etree import ElementTree as ET

"""
- returncode=200 表示接口返回成功 
- key=*.com表示当前check的域名 
- original=210 : Domain name is available 表示域名可以注册 
- original=211 : Domain name is not available 表示域名已经注册 
- original=212 : Domain name is invalid 表示域名参数传输错误 
- original=213 : Time out 查询超时
"""

logging.basicConfig(level=logging.INFO,
    format='%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s - %(message)s',
    datefmt='%a, %d %b %Y %H:%M:%S',
    filename='domain.log')

domain_check_api = "http://panda.www.net.cn/cgi-bin/check.cgi?area_domain={}"
headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/64.0.3282.186 Safari/537.36'}


def write_file(domain):
    with open("domain.txt", "a+") as f:
        f.write(f"{domain}\n")


def req_data(domain):
    try:
        result = requests.get(domain_check_api.format(domain), headers=headers)
        root = ET.XML(result.text)
        for node in root.iter('property'):
            code = node.find('returncode').text
            original = node.find('original').text
            if int(code) == 200 and '210' in original:
                write_file(domain)
                logging.info(f"[{domain}]可注册")
            else:
                logging.info(f"[{code}][{original}][{domain}]不可注册")
        time.sleep(0.1)
    except:
        pass


def main(sd, domain_length, domain_suffix):
    for ds in domain_suffix:
        for x in itertools.product(sd, repeat = domain_length):
            domain = f"{''.join(x)}.{ds}"
            req_data(domain)


if __name__ == "__main__":
    sd = string.digits    # 纯数字
    # sd = string.ascii_lowercase # 纯字母
    # sd = string.digits+string.ascii_lowercase # 数字字母组合
    domain_length = 4
    domain_suffix = ['cn', 'com.cn', 'com']
    main(sd, domain_length, domain_suffix)

python itertools模块实现排列组合

一、笛卡尔积:itertools.product(*iterables[, repeat])

直接对自身进行笛卡尔积:

import itertools
for i in itertools.product('ABCD', repeat=2):
    print (''.join(i),end=' ')

输出结果: 
AA AB AC AD BA BB BC BD CA CB CC CD DA DB DC DD 
print (”.join(i))这个语句可以让结果直接排列到一起 
end=”可以让默认的输出后换行变为一个空格

两个元组进行笛卡尔积:

import itertools
a = (1, 2, 3)
b = ('A', 'B', 'C')
c = itertools.product(a,b)
for i in c:
    print(i,end=' ')

输出结果: 
(1, ‘A’) (1, ‘B’) (1, ‘C’) (2, ‘A’) (2, ‘B’) (2, ‘C’) (3, ‘A’) (3, ‘B’) (3, ‘C’)

二、排列:itertools.permutations(iterable[, r])

import itertools
for i in itertools.permutations('ABCD', 2):
    print (''.join(i),end=' ')

输出结果: 
AB AC AD BA BC BD CA CB CD DA DB DC

三、组合:itertools.combinations(iterable, r)

import itertools
for i in itertools.combinations('ABCD', 3):
    print (''.join(i))

输出结果: 
ABC 
ABD 
ACD 
BCD

四、组合(包含自身重复):itertools.combinations_with_replacement(iterable, r)

import itertools
for i in itertools.combinations_with_replacement('ABCD', 3):
    print (''.join(i),end=' ')

输出结果: 
AAA AAB AAC AAD ABB ABC ABD ACC ACD ADD BBB BBC BBD BCC BCD BDD CCC CCD CDD DDD

使用python检查SSL证书到期情况

结合邮件告警和页面展示,再多的域名证书到期情况即可立马知道

# coding: utf-8 
# 查询域名证书到期情况

import re
import time
import subprocess
from datetime import datetime
from io import StringIO

def main(domain):
    f = StringIO()
    comm = f"curl -Ivs https://{domain} --connect-timeout 10"

    result = subprocess.getstatusoutput(comm)
    f.write(result[1])

    m = re.search('start date: (.*?)\n.*?expire date: (.*?)\n.*?common name: (.*?)\n.*?issuer: CN=(.*?)\n', f.getvalue(), re.S)
    start_date = m.group(1)
    expire_date = m.group(2)
    common_name = m.group(3)
    issuer = m.group(4)

    # time 字符串转时间数组
    start_date = time.strptime(start_date, "%b %d %H:%M:%S %Y GMT")
    start_date_st = time.strftime("%Y-%m-%d %H:%M:%S", start_date)
    # datetime 字符串转时间数组
    expire_date = datetime.strptime(expire_date, "%b %d %H:%M:%S %Y GMT")
    expire_date_st = datetime.strftime(expire_date,"%Y-%m-%d %H:%M:%S")

    # 剩余天数
    remaining = (expire_date-datetime.now()).days

    print ('域名:', domain)
    print ('通用名:', common_name)
    print ('开始时间:', start_date_st)
    print ('到期时间:', expire_date_st)
    print (f'剩余时间: {remaining}天')
    print ('颁发机构:', issuer)
    print ('*'*30)

    time.sleep(0.5)

if __name__ == "__main__":
    domains = ['www.01314.cn', 'www.51bbo.com'] 
    for domain in domains:
        main(domain)

结果

域名: www.01314.cn
通用名: www.01314.cn
开始时间: 2018-10-18 00:00:00
到期时间: 2019-10-18 12:00:00
剩余时间: 307天
颁发机构: Encryption Everywhere DV TLS CA - G1,OU=www.digicert.com,O=DigiCert Inc,C=US
******************************
域名: www.51bbo.com
通用名: 51bbo.com
开始时间: 2018-09-27 00:00:00
到期时间: 2019-09-27 12:00:00
剩余时间: 286天
颁发机构: Encryption Everywhere DV TLS CA - G1,OU=www.digicert.com,O=DigiCert Inc,C=US
******************************

采集soundcloud专辑

1、需要设置代理

2、滚屏加载更多歌曲

3、解析歌曲完整下载路径

import os
import time
import json
import requests
from selenium import webdriver
from scrapy.selector import Selector
from selenium.webdriver.support.ui import WebDriverWait

proxies = {
    "http": "http://192.168.1.88:1088",
    "https": "http://192.168.1.88:1088",
}

def music_download(url):
    file_name = url.split('?')[0].split('/')[-1]
    r = requests.get(url, stream=True)
    with open(f"music/{file_name}", 'wb') as f:
        f.write(r.content)
    if os.path.exists(file_name) and os.path.getsize(file_name) > 1*1000*1000:
        print(f"{file_name} download success, file size: {os.path.getsize(file_name)/1000/1000}M")
    else:
        print (f"{file_name} download fail.")

def music_index(url):
    url_desc_api = f"https://api.soundcloud.com/resolve?url={url}&client_id=LvWovRaJZlWCHql0bISuum8Bd2KX79mb"
    r = requests.get(url_desc_api, proxies=proxies)
    json_r = json.loads(r.text)
    sound_id = json_r['id']
    if not sound_id is None:
        url_download_api = f"https://api.soundcloud.com/i1/tracks/{sound_id}/streams?client_id=LvWovRaJZlWCHql0bISuum8Bd2KX79mb"
        sound_r = requests.get(url_download_api, proxies=proxies)
        json_sound_r = json.loads(sound_r.text)
        print (f"当前任务ID: {sound_id}")
        try:
            music_download_url = json_sound_r['http_mp3_128_url']
            if music_download_url:
                music_download(music_download_url)
        except:
            pass

def soundcloud_index():
    url = "https://soundcloud.com/beyond-synth"
    chromeOptions = webdriver.ChromeOptions()
    # 加入代理功能
    chromeOptions.add_argument(f"--proxy-server=http://192.168.1.88:1088")
    browser = webdriver.Chrome(chrome_options = chromeOptions)     
    browser.get(url)

    # 等待滚屏到最后
    js1 = 'return document.body.scrollHeight'
    js2 = 'window.scrollTo(0, document.body.scrollHeight)'
    old_scroll_height = 0
    while browser.execute_script(js1) >= old_scroll_height:
        old_scroll_height = browser.execute_script(js1)
        browser.execute_script(js2) 
        time.sleep(1)
    # 开始处理页面
    content = browser.find_elements_by_class_name('sound__coverArt')
    count = 1
    for c in content:
        single_url = c.get_attribute('href')
        music_index(single_url)
        time.sleep(1)
        print (f"当前第 {count} 条")
        count += 1
    # 结束任务
    browser.quit()
    
if __name__ == "__main__":
    soundcloud_index()