基于线性回归模型预测体彩大乐透和排列五开奖号码

事先声明:线性回归模型无法预测随机数,本文以学习为主图一乐。

整体思路:先爬取开奖数据,存储到Excel表格或者数据库中,然后每天爬取最新的一条开奖数据进行更新。取近七次开奖数据(数据量过大预测数据会趋于一个固定值),以期号和开奖时间做特征变量,开奖号码为目标变量创建线性回归模型并拟合数据。最后用最新的期号和开奖时间进行预测开奖号码。

爬取数据

大乐透

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
import requests
import collections
import json
import time
import pandas as pd

starttime = time.time()

header = {
'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/89.0.4389.90 Safari/537.36'
}

columns = ['开奖日期', '期号', '前区', '后区']
data = collections.OrderedDict()
dates = []
ids = []
front = []
behind = []
pageNos = 1

while True:
url = 'https://webapi.sporttery.cn/gateway/lottery/getHistoryPageListV1.qry?gameNo=85&provinceId=0&pageSize=30&isVerify=1&pageNo={}'.format(pageNos)

response = requests.get(url, headers=header)
content = response.content.decode('utf-8')

js = json.loads(content)
numbers = js.get('value')
lists = numbers.get('list')

for shujudict in lists:
date = shujudict.get('lotteryDrawTime')
dates.append(date)
id = shujudict.get('lotteryDrawNum')
ids.append(id)
shuju = shujudict.get('lotteryDrawResult')
qianqu = shuju[:14]
houqu = shuju[-5:]
front.append(qianqu)
behind.append(houqu)

if len(lists) == 30:
# 如果lists长度为30就说明还有下一页,继续循环
pageNos += 1
else:
# 否则结束循环
break

data['开奖日期'] = dates
data['期号'] = ids
data['前区'] = front
data['后区'] = behind

df = pd.DataFrame(data, columns=columns)
df.to_excel('D:\my\dlt.xlsx')

endtime = time.time()
elapsed_time = endtime - starttime
print("耗时:", elapsed_time, "秒")

排列五

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
import requests
import collections
import json
import time
import mysql.connector
import pandas as pd

starttime = time.time()

# 使用mysql-connector-python驱动程序连接MySQL数据库
cnx = mysql.connector.connect(
host='',
user='',
password='',
database=''
)
cursor = cnx.cursor()

header = {
'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/89.0.4389.90 Safari/537.36'
}

columns = ['期号', '开奖日期', '开奖号码', '中奖注数', '中奖金额(元)', '销售额(元)', '奖池金额(元)']
data = collections.OrderedDict()
issueList = []
lotteryDateList = []
lotteryNumList = []
lotteryCountList = []
lotteryMoneyList = []
salesList = []
pondList = []
pageNos = 1

while True:
url = 'https://webapi.sporttery.cn/gateway/lottery/getHistoryPageListV1.qry?gameNo=350133&provinceId=0&pageSize=30&isVerify=1&pageNo={}'.format(pageNos)

response = requests.get(url, headers=header)
content = response.content.decode('utf-8')

js = json.loads(content)
numbers = js.get('value')
lists = numbers.get('list')

for list in lists:
issue = list.get('lotteryDrawNum')
issueList.append(issue)
lotteryDate = list.get('lotteryDrawTime')
lotteryDateList.append(lotteryDate)
lotteryNum = list.get('lotteryDrawResult')
lotteryNumList.append(lotteryNum)
lotteryCount = list.get('prizeLevelList')[0].get('stakeCount')
lotteryCountList.append(lotteryCount)
lotteryMoney = list.get('prizeLevelList')[0].get('stakeAmount')
lotteryMoneyList.append(lotteryMoney)
sales = list.get('totalSaleAmount')
salesList.append(sales)
pond = list.get('poolBalanceAfterdraw')
pondList.append(pond)
sql = "INSERT INTO `array5`(`issue`, `lottery_date`, `lottery_num`, `lottery_count`, `lottery_money`, `sales`, `pond`) VALUES (%s, %s, %s, %s, %s, %s, %s) "
item = (issue, lotteryDate, lotteryNum, lotteryCount, lotteryMoney, sales, pond) # 要插入的数据
print("item:", item)
cursor.execute(sql, item)
cnx.commit()

if len(lists) == 30:
# 如果lists长度为30就说明还有下一页,继续循环
pageNos += 1
else:
# 否则结束循环
break

cursor.close() # 关闭游标
cnx.close() # 关闭连接

data['期号'] = issueList
data['开奖日期'] = lotteryDateList
data['开奖号码'] = lotteryNumList
data['中奖注数'] = lotteryCountList
data['中奖金额(元)'] = lotteryMoneyList
data['销售额(元)'] = salesList
data['奖池金额(元)'] = pondList

df = pd.DataFrame(data, columns=columns)
df.to_excel('D:\my\plw.xlsx')

endtime = time.time()
print("耗时:", endtime - starttime, "秒")

预测开奖号码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
import requests
import json
from datetime import date
import mysql.connector
from sqlalchemy import create_engine
import pandas as pd
from sklearn.linear_model import LinearRegression

today = date.today()

# 使用mysql-connector-python驱动程序连接MySQL数据库
cnx = mysql.connector.connect(
host='',
user='',
password='',
database=''
)
# 将 mysql.connector 连接对象转换为 SQLAlchemy 连接对象
engine = create_engine('mysql+mysqlconnector://', creator=lambda: cnx)
cursor = cnx.cursor()

header = {
'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/89.0.4389.90 Safari/537.36'
}


def days_between_dates(start_date, end_date):
delta = end_date - start_date
return delta.days


cursor.execute(
'SELECT lottery_date,issue,lottery_num FROM `array5` ORDER BY lottery_date DESC LIMIT 1')
result = list(cursor.fetchall())
lotteryDateLast = result[0][0]
issueLast = result[0][1]

# 计算天数差异
days = days_between_dates(lotteryDateLast, today)

if days > 0:
url = 'https://webapi.sporttery.cn/gateway/lottery/getHistoryPageListV1.qry?gameNo=350133&provinceId=0&pageSize={}&isVerify=1&pageNo=1'.format(
days)
response = requests.get(url, headers=header)
content = response.content.decode('utf-8')
js = json.loads(content)
numbers = js.get('value')
lists = numbers.get('list')
for list in lists:
issue = list.get('lotteryDrawNum')
lotteryDate = list.get('lotteryDrawTime')
lotteryNum = list.get('lotteryDrawResult')
lotteryCount = list.get('prizeLevelList')[0].get('stakeCount')
lotteryMoney = list.get('prizeLevelList')[0].get('stakeAmount')
sales = list.get('totalSaleAmount')
pond = list.get('poolBalanceAfterdraw')
cursor.execute(
'SELECT count(1) AS count FROM `array5` WHERE issue = %s', [issue])
count = cursor.fetchone()
if count[0] == 0:
sql = "INSERT INTO `array5`(`issue`, `lottery_date`, `lottery_num`, `lottery_count`, `lottery_money`, `sales`, `pond`) VALUES (%s, %s, %s, %s, %s, %s, %s) "
item = (issue, lotteryDate, lotteryNum, lotteryCount,
lotteryMoney, sales, pond) # 要插入的数据
cursor.execute(sql, item)
cnx.commit()


def predict_num(num):
query = "SELECT issue, lottery_date, SUBSTRING(lottery_num, {}, 1) AS lottery_num FROM array5 ORDER BY lottery_date DESC LIMIT 7".format(
num)
data = pd.read_sql(query, engine)
# 准备特征和目标变量
X = data[['issue', 'lottery_date']]
y = data['lottery_num']

# 将日期转换为时间戳
X.loc[:, 'lottery_date'] = pd.to_datetime(X['lottery_date']).apply(lambda x: x.timestamp())

# 创建线性回归模型并拟合数据
model = LinearRegression()
model.fit(X, y)

# 获取最新的数据用于预测
# 创建包含新数据的 DataFrame
new_data = pd.DataFrame(
{'issue': [issueLast + 1], 'lottery_date': [today]})
new_data['lottery_date'] = pd.to_datetime(
new_data['lottery_date']).apply(lambda x: x.timestamp())
new_X = new_data[['issue', 'lottery_date']]
# 进行预测
prediction = model.predict(new_X)
rounded_prediction = int(round(prediction[0], 0))
return rounded_prediction


columns = [predict_num(1), predict_num(3), predict_num(
5), predict_num(7), predict_num(9)]
randNums = ' '.join([str(column) for column in columns])
cursor.execute(
'SELECT count(1) AS count FROM `array5_rand` WHERE random_num = %s AND issue = %s', [randNums, issueLast + 1])
count = cursor.fetchone()
if count[0] == 0:
sql = "INSERT INTO `array5_rand`(`random_num`, `issue`) VALUES (%s, %s) "
item = (randNums, issueLast + 1) # 要插入的数据
cursor.execute(sql, item)
cnx.commit()
cursor.execute(
'SELECT count(1) AS count FROM `array5` WHERE lottery_num = %s', [randNums])
count = cursor.fetchone()
print("开奖号码为:", randNums, "|| 此号码历史上共开出", count[0], "次")


cursor.close() # 关闭游标
cnx.close() # 关闭连接