最近接到一个需求,需要获取高德地图上的门店信息,查了一下官方有api接口(默认是1000的免费请求额度),300块钱买10万额度。记录一下实现方法(全是chatgpt帮我写的),后续如果有市场调研可能可以用上。
主代码:
get_types.py
import requests
import json
import datetime
import pymysql
def getData(apikey, keywords, types, city):
print(city, types, keywords)
for i in range(1, 20):
url = 'https://restapi.amap.com/v3/place/text'
params = {
'key': apikey,
'keywords': str(keywords),
'types': str(types),
'city': str(city),
'citylimit': 'true',
'output': 'JSON',
'offset': 25,
'page': i,
'extensions': 'base'
}
response = requests.get(url, params=params)
if response.status_code == 200:
data = json.loads(response.text)
if data['status'] == '1':
for poi in data['pois']:
name = poi['name']
address = poi['address']
location = poi['location']
type = poi['type']
pname = poi['pname']
cityname = poi['cityname']
adname = poi['adname']
tel = poi['tel']
biz_type = poi['biz_type']
photos = poi['photos']
url = ""
for photo in photos:
url = url + photo['url'] + ";";
biz_ext = "'" + str(poi['biz_ext']).replace("\\n", "").replace("\\'","\\'") + "'"
biz_ext_obj = poi['biz_ext']
biz_ext_str = "";
if biz_ext.find('opentime2') >= 0 :
biz_ext_str = biz_ext_obj['opentime2'] + ";"
if biz_ext.find('open_time')>= 0:
biz_ext_str = biz_ext_str + biz_ext_obj['open_time'] + ";"
now_time = datetime.datetime.now().strftime('%Y-%m-%d')
itemStr = "'" + str(name).replace("'", "·") + "','" +str(address).replace("'", "·")+ "','" +str(location)+ "','" +str(type)+ "','" +str(pname)+ "','" +str(cityname)+ "','" +str(adname)+ "','" +str(tel)+ "','" + str(biz_type) + "','" +str(url)+ "','" +str(biz_ext_str)+ "','" +str(now_time)+ "'"
print(itemStr)
conn = connect_db('gaode')
num1 = insertDb(conn, 'gaode', "name,address,location,type,pname,cityname,adname,tel,biz_type,photos,biz_ext,insert_time", itemStr)
else:
print('请求失败:', data['info'])
else:
print('请求失败:', response.status_code)
def connect_db(db_name):
conn = pymysql.connect(host='ip', user='user', password='password', db=db_name, charset='utf8mb4')
return conn
def insertDb(conn, table, columns, values):
cursor = conn.cursor()
sql = f"INSERT INTO {table} ({columns}) VALUES ({values})"
cursor.execute(sql)
conn.commit()
return cursor.lastrowid
if __name__ == '__main__':
apikey = "xxxxx"
# 行业poi,从高德官方API可以下载到详细的表格
# 060600 购物服务|家居建材市场|家居建材市场
# 060601 购物服务|家居建材市场|家具建材综合市场
# 060602 购物服务|家居建材市场|家具城
# 060603 购物服务|家居建材市场|建材五金市场
# 060604 购物服务|家居建材市场|厨卫市场
# 060605 购物服务|家居建材市场|布艺市场
# 060606 购物服务|家居建材市场|灯具瓷器市场
types = "060600|060601|060602|060603|060604|060605|060606"
#types = "060600"
# 搜索关键字
# keywords_list = ['门窗', '全屋', '家居', '家具']
keywords_list = ['装修']
with open('poicode_full', 'r') as file:
lines = file.readlines()
for keywords in keywords_list:
for line in lines:
city = line.strip() # 删除尾部的换行符
getData(apikey, keywords, types, city)
# for province, cities in provinces_cities.items():
# for city in cities:
# getData(apikey, keywords, types, city)
poicode_full,区域id,从高德官方API获取的内容,一行一个(这里只举例一部分)
110101
110102
110105
110106
110107
110108
110109
110111
110112
110113
建表语句:
CREATE TABLE `gaode` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`name` varchar(255) DEFAULT NULL,
`address` varchar(255) DEFAULT NULL,
`location` varchar(255) DEFAULT NULL,
`type` varchar(255) DEFAULT NULL,
`pname` varchar(255) DEFAULT NULL,
`cityname` varchar(255) DEFAULT NULL,
`adname` varchar(255) DEFAULT NULL,
`tel` varchar(255) DEFAULT NULL,
`biz_type` varchar(255) DEFAULT NULL,
`photos` text,
`biz_ext` text,
`insert_time` datetime DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1498 DEFAULT CHARSET=utf8mb4;
顺便薅了一套前端页面,可筛选可统计,效果如下:
代码目录结果如下:
app.py
import json
from flask import Flask, render_template, request, url_for
from flask_sqlalchemy import SQLAlchemy
from sqlalchemy import func
app = Flask(__name__)
with open('config.json') as f:
config = json.load(f)
db_user = config['DB_USER']
db_password = config['DB_PASSWORD']
db_host = config['DB_HOST']
db_name = config['DB_NAME']
app.config['SQLALCHEMY_DATABASE_URI'] = f'mysql://{db_user}:{db_password}@{db_host}/{db_name}'
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
app.config['WTF_CSRF_ENABLED'] = False
db = SQLAlchemy(app)
class Gaode(db.Model):
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(255))
address = db.Column(db.String(255))
location = db.Column(db.String(255))
type = db.Column(db.String(255))
pname = db.Column(db.String(255))
cityname = db.Column(db.String(255))
adname = db.Column(db.String(255))
tel = db.Column(db.String(255))
biz_type = db.Column(db.String(255))
photos = db.Column(db.Text)
biz_ext = db.Column(db.Text)
insert_time = db.Column(db.DateTime)
@app.route('/', methods=['GET'])
def index():
page = request.args.get('page', 1, type=int)
pname = request.args.get('pname', '')
pnames = db.session.query(Gaode.pname, func.count(Gaode.pname)).group_by(Gaode.pname).all()
results = Gaode.query.filter(Gaode.pname.like('%' + pname + '%')).paginate(page=page, per_page=200, error_out=False)
total_results = results.total
total_pages = results.pages
next_url = url_for('index', page=results.next_num, pname=pname) if results.has_next else None
prev_url = url_for('index', page=results.prev_num, pname=pname) if results.has_prev else None
return render_template('index.html', pnames=pnames, results=results.items, total_results=total_results, total_pages=total_pages, next_url=next_url, prev_url=prev_url)
if __name__ == '__main__':
app.run(host='0.0.0.0',debug=True)
config.json
{
"DB_USER": "user",
"DB_PASSWORD": "password",
"DB_HOST": "ip",
"DB_NAME": "gaode"
}
index.html
<!DOCTYPE html>
<html>
<head>
<title>Search</title>
<style>
table {
border-collapse: collapse;
width: 100%;
}
th, td {
border: 1px solid black;
padding: 8px;
text-align: left;
}
tr:nth-child(even) {
background-color: #f2f2f2;
}
.button-container {
display: flex;
flex-wrap: wrap;
gap: 10px;
}
.button-container a {
text-decoration: none;
}
.button-container button {
width: auto;
white-space: nowrap;
}
</style>
</head>
<body>
<div class="button-container">
{% for pname, count in pnames %}
<a href="{{ url_for('index', pname=pname) }}">
<button>{{ pname }} <span>({{ count }})</span></button>
</a>
{% endfor %}
</div>
<p>Total results: {{ total_results }}</p>
<p>Total pages: {{ total_pages }}</p>
<table>
<tr>
<th>ID</th>
<th>Name</th>
<th>Address</th>
<th>Location</th>
<th>Type</th>
<th>Pname</th>
<th>Cityname</th>
<th>Adname</th>
<th>Tel</th>
<th>Biz Type</th>
<th>Photos</th>
<th>Biz Ext</th>
<th>Insert Time</th>
</tr>
{% for result in results %}
<tr>
<td>{{ result.id }}</td>
<td>{{ result.name }}</td>
<td>{{ result.address }}</td>
<td>{{ result.location }}</td>
<td>{{ result.type }}</td>
<td>{{ result.pname }}</td>
<td>{{ result.cityname }}</td>
<td>{{ result.adname }}</td>
<td>{{ result.tel }}</td>
<td>{{ result.biz_type }}</td>
<td>{{ result.photos }}</td>
<td>{{ result.biz_ext }}</td>
<td>{{ result.insert_time }}</td>
</tr>
{% endfor %}
</table>
{% if prev_url %}
<a href="{{ prev_url }}">Previous Page</a>
{% endif %}
{% if next_url %}
<a href="{{ next_url }}">Next Page</a>
{% endif %}
</body>
</html>
评论区