网络搭建:

mynn.py:

import torch
from torch import nn
class mynn(nn.Module):
def __init__(self):
super(mynn, self).__init__()
self.layer1 = nn.Sequential(
nn.Linear(3520, 4096), nn.BatchNorm1d(4096), nn.ReLU(True)
)
self.layer2 = nn.Sequential(
nn.Linear(4096, 4096), nn.BatchNorm1d(4096), nn.ReLU(True)
)
self.layer3 = nn.Sequential(
nn.Linear(4096, 4096), nn.BatchNorm1d(4096), nn.ReLU(True)
)
self.layer4 = nn.Sequential(
nn.Linear(4096, 4096), nn.BatchNorm1d(4096), nn.ReLU(True)
)
self.layer5 = nn.Sequential(
nn.Linear(4096, 3072), nn.BatchNorm1d(3072), nn.ReLU(True)
)
self.layer6 = nn.Sequential(
nn.Linear(3072, 2048), nn.BatchNorm1d(2048), nn.ReLU(True)
)
self.layer7 = nn.Sequential(
nn.Linear(2048, 1024), nn.BatchNorm1d(1024), nn.ReLU(True)
)
self.layer8 = nn.Sequential(
nn.Linear(1024, 256), nn.BatchNorm1d(256), nn.ReLU(True)
)
self.layer9 = nn.Sequential(
nn.Linear(256, 64), nn.BatchNorm1d(64), nn.ReLU(True)
)
self.layer10 = nn.Sequential(
nn.Linear(64, 32), nn.BatchNorm1d(32), nn.ReLU(True)
)
self.layer11 = nn.Sequential(
nn.Linear(32, 3)
)

def forward(self, x):
x = self.layer1(x)
x = self.layer2(x)
x = self.layer3(x)
x = self.layer4(x)
x = self.layer5(x)
x = self.layer6(x)
x = self.layer7(x)
x = self.layer8(x)
x = self.layer9(x)
x = self.layer10(x)
x = self.layer11(x)
return x

Dataset重定义:
mydataset.py

import os
from torch.utils import data
import numpy as np
from astropy.io import fits
from torchvision import transforms as T
from PIL import Image
import pandas as pd

class mydataset(data.Dataset):

def __init__(self,csv_file,root_dir=None,transform=None):
self.landmarks_frame=np.loadtxt(open(csv_file,"rb"),delimiter=",") #landmarks_frame是一个numpy矩阵
self.root_dir=root_dir
self.transform=transform
def __len__(self):
return len(self.landmarks_frame)
def __getitem__(self, idx):
lfit=self.landmarks_frame[idx,:]
lable=lfit[len(lfit)-1]
datafit=lfit[0:(len(lfit)-1)]
return lable,datafit
主程序:
main.py
import torch
from torch import nn, optim
from torchvision import datasets, transforms
from torch.autograd import Variable
#from models import Mynet, my_AlexNet, my_VGG
from sdata import mydataset
import time
import numpy as np
from model import mynn
if __name__ == '__main__': #如果Dataloader开启num_workers > 0 必须要在'__main__'下才能消除报错

data_train = mydataset.mydataset(csv_file="G:\\DATA\\train.csv",root_dir=None,transform=None)
#data_test = mydataset(test=True)
data_test = mydataset.mydataset(csv_file="G:\\DATA\\test.csv", root_dir=None, transform=None)
data_loader_train = torch.utils.data.DataLoader(dataset=data_train,
batch_size=256,
shuffle=True,
num_workers=0,
pin_memory=True)
data_loader_test = torch.utils.data.DataLoader(dataset=data_test,
batch_size=256,
shuffle=True,
num_workers=0,
pin_memory=True)
print("**dataloader done**")
model = mynn.mynn()

if torch.cuda.is_available():
#model = model.cuda()
model.to(torch.device('cuda'))
#损失函数
criterion = nn.CrossEntropyLoss()
#optimizer = optim.SGD(model.parameters(), lr=0.01, momentum=0.9)
#优化算法
optimizer = optim.Adam(model.parameters(), lr=0.001, betas=(0.9, 0.999), eps=1e-08, weight_decay=1e-4)
n_epochs = 1000

global_train_acc = []

s_time = time.time()

for epoch in range(n_epochs):
running_loss = 0.0
running_correct = 0.0
print('Epoch {}/{}'.format(epoch, n_epochs))
for label,datafit in data_loader_train:
x_train, y_train = datafit,label
#x_train, y_train = Variable(x_train.cuda()), Variable(y_train.cuda())
x_train, y_train = x_train.to(torch.device('cuda')), y_train.to(torch.device('cuda'))
x_train=x_train.float()
y_train=y_train.long()
#x_train, y_train = Variable(x_train), Variable(y_train)
outputs = model(x_train)
_, pred = torch.max(outputs.data, 1)
optimizer.zero_grad()
loss = criterion(outputs, y_train)
loss.backward()
optimizer.step()

running_loss += loss.item()
running_correct += torch.sum(pred == y_train.data)

testing_correct = 0.0
for label,datafit in data_loader_test:
x_test, y_test = datafit,label
x_test=x_test.float()
y_test=y_test.long()
x_test, y_test = Variable(x_test.cuda()), Variable(y_test.cuda())
# x_test, y_test = Variable(x_test), Variable(y_test)
outputs = model(x_test)
_, pred = torch.max(outputs.data, 1)
testing_correct += torch.sum(pred == y_test.data)

print('Loss is:{:.4f}, Train Accuracy is:{:.4f}%, Test Accuracy '
'is:{:.4f}'.format(running_loss / len(data_train),
100 * running_correct / len(data_train),
100 * testing_correct / len(data_test)))


e_time = time.time()
print('time_run is :', e_time - s_time)
print('*******done******')

将天文数据写入csv中:
main.py
# -*- coding: utf-8 -*-
"""
Spyder Editor

This is a temporary script file.
"""

import matplotlib.pyplot as plt
from astropy.io import fits
import os
import matplotlib
matplotlib.use('Qt5Agg')
from astropy.io import fits
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn import svm
from sklearn.decomposition import PCA
def getData(fitPath,cla):
fileList=[] #所有.fit文件
files=os.listdir(fitPath) #返回一个列表,其中包含在目录条目的名称
y=[]
for f in files:
if os.path.isfile(fitPath+'/'+f) and f[-4:-1]==".fi":
fileList.append(fitPath+'/'+f) #添加文件
len=90000
x=np.ones(3521)
num=1
for path in fileList:
f = fits.open(path)
header = f[0].header # fit文件中的各种标识

SPEC_CLN = header['SPEC_CLN']
SN_G = header['SN_G']
NAXIS1 = header['NAXIS1'] # 光谱数据维度
COEFF0 = header['COEFF0']
COEFF1 = header['COEFF1']
wave = np.ones(NAXIS1) # 光谱图像中的横坐标
for i in range(NAXIS1):
wave[i] = i
logwavelength = COEFF0 + wave * COEFF1
for i in range(NAXIS1):
wave[i] = 10 ** logwavelength[i]
min=0
for i in range(NAXIS1-1):
if wave[i]<=4000 and wave[i+1]>=4000:
min=i
spec = f[0].data[0, :] # 光谱数据 fit中的第一行数据
spec=spec[min:min+3521]
spec=np.array(spec)
spec[3520]=cla
if num==1:
x=spec
num=2
else:
x=np.row_stack((x,spec))
#np.savetxt(csvPath,x, delimiter=',')
return x

if __name__ == '__main__':
x=getData("G:\DATA\STAR",0)
x_train,x_test=train_test_split(x,test_size=0.1 ,random_state=0)

y=getData("G:\DATA\QSO",1)
y_train, y_test = train_test_split(y, test_size=0.1, random_state=0)
x_train = np.row_stack((x_train,y_train))
x_test=np.row_stack((x_test,y_test))

z=getData("G:\DATA\GALAXY",2)
z_train, z_test = train_test_split(z, test_size=0.1, random_state=0)
x_train=np.row_stack((x_train,z_train))
x_test = np.row_stack((x_test,z_test))
np.savetxt("G:\\DATA\\train.csv",x_train, delimiter=',')
np.savetxt("G:\\DATA\\test.csv", x_test, delimiter=',')

贝叶斯分类
# -*- coding: utf-8 -*-
"""
Spyder Editor

This is a temporary script file.
"""

import matplotlib.pyplot as plt
from astropy.io import fits
import os
import matplotlib
from sklearn.decomposition import PCA
matplotlib.use('Qt5Agg')
from astropy.io import fits
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn import svm
from sklearn.decomposition import PCA
from sklearn.naive_bayes import GaussianNB
from sklearn.naive_bayes import MultinomialNB
from sklearn.naive_bayes import BernoulliNB
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MinMaxScaler

def getData(path,cla):
fileList=[] #所有.fit文件
files=os.listdir(path) #返回一个列表,其中包含在目录条目的名称
y=[]
for f in files:
if os.path.isfile(path+'/'+f) and f[-4:-1]==".fi":
fileList.append(path+'/'+f) #添加文件
len=90000
x=np.ones(3520)
num=1
nn=0
for path in fileList:
f = fits.open(path)
header = f[0].header # fit文件中的各种标识

SPEC_CLN = header['SPEC_CLN']
SN_G = header['SN_G']
NAXIS1 = header['NAXIS1'] # 光谱数据维度
COEFF0 = header['COEFF0']
COEFF1 = header['COEFF1']


wave = np.ones(NAXIS1) # 光谱图像中的横坐标
for i in range(NAXIS1):
wave[i] = i
logwavelength = COEFF0 + wave * COEFF1
for i in range(NAXIS1):
wave[i] = 10 ** logwavelength[i]
min=0
for i in range(NAXIS1-1):
if wave[i]<=4000 and wave[i+1]>=4000:
min=i
spec = f[0].data[0, :] # 光谱数据 fit中的第一行数据
spec=spec[min:min+3520]
spec=np.array(spec)


#归一化处理
smin,smax=spec.min(),spec.max()
spec= (spec-smin)/(smax-smin)
spec=spec.reshape(1, -1)


if num==1:
x=spec
num=2
y.append(cla)
else:
x=np.row_stack((x,spec))
y.append(cla)
nn=nn+1

y=np.array(y)
y.reshape(1,-1)
return x,y
def show_accuracy(y_hat,y_test):
num=0.0
for i in range(len(y_test)):
if y_hat[i]==y_test[i]:
num=num+1.0
return num/float(len(y_test))
if __name__ == '__main__':
x1,y1=getData("G:\DATA\m01520949",0)
#pca = PCA(n_components=1000)
#x1 = pca.fit_transform(x1)
x1_train, x1_test, y1_train, y1_test= train_test_split(x1,y1, test_size=0.1, random_state=0)
x2,y2=getData("G:\DATA\m11520604",1)
#x2=pca.fit_transform(x2)
x2_train, x2_test, y2_train, y2_test = train_test_split(x2, y2, test_size=0.1, random_state=0)
x3,y3=getData("G:\DATA\m21520739", 2)
#x3 = pca.fit_transform(x3)
x3_train, x3_test, y3_train, y3_test = train_test_split(x3, y3, test_size=0.1, random_state=0)
x4, y4 = getData("G:\DATA\m31520635", 3)
# x3 = pca.fit_transform(x3)
x4_train, x4_test, y4_train, y4_test = train_test_split(x4, y4, test_size=0.1, random_state=0)
#训练集


x_train = np.row_stack((x1_train, x2_train))
x_train = np.row_stack((x_train, x3_train))
x_train = np.row_stack((x_train, x4_train))

y_train=np.append(y1_train,y2_train)
y_train=np.append(y_train,y3_train)
y_train = np.append(y_train, y4_train)
#测试集
x_test = np.row_stack((x1_test, x2_test))
x_test = np.row_stack((x_test, x3_test))
x_test = np.row_stack((x_test, x4_test))

y_test=np.append(y1_test,y2_test)
y_test=np.append(y_test,y3_test)
y_test = np.append(y_test, y4_test)

yy1_train=y_train
# for i in range(len(y_train)):
# if yy1_train[i]!=0:
# yy1_train[i]=1
# else:
# yy1_train[i]=-1
# yy1_test=y_test

# for i in range(len(y_test)):
# if yy1_test[i]!=0:
# yy1_test[i]=1
# else:
# yy1_test[i]=-1
print(x_train.shape)
print(y_train.shape)
print(x_test.shape)
print(y_test.shape)
#clf = svm.SVC(C=0.9, kernel='linear', gamma=50, decision_function_shape='ovo')
#clf.fit(x_train, yy1_train)
#print(clf.score(x_train,yy1_train))
#y_hat = clf.predict(x_test)
#print(show_accuracy(y_hat, yy1_test))
#pca=PCA(n_components=260)
#x_train=pca.fit_transform(x_train)
#x_test=pca.fit_transform(x_test)
model=MultinomialNB(alpha=1.0, fit_prior=True, class_prior=None)
model.fit(x_train,y_train)
y_hat=model.predict(x_test)
print(show_accuracy(y_hat, y_test))

RNN分类:
# -*- coding: utf-8 -*-
"""
Created on Tue Oct 8 15:10:35 2019

@author: DELL
"""

# -*- coding: utf-8 -*-
"""
Created on Sun Sep 29 15:02:36 2019

@author: DELL
"""

import torch
from torch import nn
from torch.autograd import Variable
import torchvision.datasets as dsets
import torch.utils.data as Data
import matplotlib.pyplot as plt
import torchvision
import os
from torch.utils import data
import numpy as np
from astropy.io import fits
from torchvision import transforms as T
from PIL import Image
import pandas as pd

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")


class mydataset(data.Dataset):

def __init__(self, csv_file, root_dir=None, transform=None):
self.landmarks_frame = np.loadtxt(open(csv_file, "rb"), delimiter=",") # landmarks_frame是一个numpy矩阵
self.root_dir = root_dir
self.transform = transform

def __len__(self):
return len(self.landmarks_frame)

def __getitem__(self, idx):
lfit = self.landmarks_frame[idx, :]
lable = lfit[len(lfit) - 1]
datafit = lfit[0:(len(lfit) - 1)]
return lable, datafit


torch.manual_seed(1)

EPOCH = 60
BATCH_SIZE = 16
TIME_STEP = 28
IMPUT_SIZE = 28
LR = 0.00001
DOWNLOAD_MNIST = True

train_data = mydataset(csv_file="G:\\DATA\\mtrain.csv")
test_data = mydataset(csv_file="G:\\DATA\\mtest.csv")
# test_data = torchvision.datasets.MNIST(root='./mnist',train=False)
train_loader = Data.DataLoader(dataset=train_data, batch_size=BATCH_SIZE, shuffle=True)
test_loader = Data.DataLoader(dataset=test_data, batch_size=400, shuffle=True)
dataiter = iter(test_loader)
test_y, test_x = dataiter.next()
test_x = test_x.view(-1, 5, 704)
test_x = test_x.float()
test_y = test_y.float()
test_x = test_x.to(device)
test_y = test_y.to(device)
print(test_y.shape)
print(test_x.shape)


# test_x=Variable(torch.unsqueeze(test_data.test_data,dim=1),volatile=True).type(torch.FloatTensor)/255
# test_x=test_x.to(device)
# test_y=(test_data.test_labels)
# test_y=test_y.to(device)
class RNN(nn.Module):
def __init__(self):
super(RNN, self).__init__()
self.rnn1 = nn.LSTM(
input_size=704,
hidden_size=128,
num_layers=1,
batch_first=True,
)
self.rnn2 = nn.LSTM(
input_size=128,
hidden_size=128,
num_layers=1,
batch_first=True,
)
self.bn = nn.BatchNorm1d(5)
self.out = nn.Sequential(
nn.Linear(128, 64), nn.BatchNorm1d(64), nn.ReLU(True),
nn.Linear(64, 32), nn.BatchNorm1d(32), nn.ReLU(True),
nn.Linear(32, 4))

def forward(self, x):
r_out1, (h_n1, c_n1) = self.rnn1(x, None) # r_out.shape=[64,28,64]
r_out1 = self.bn(r_out1)
r_out2, (h_n2, c_n2) = self.rnn2(r_out1, (h_n1, c_n1))
r_out2 = self.bn(r_out2)
r_out3, (h_n3, c_n3) = self.rnn2(r_out2, (h_n2, c_n2))
r_out3 = self.bn(r_out3)
r_out4, (h_n4, c_n4) = self.rnn2(r_out3, (h_n3, c_n3))
r_out4 = self.bn(r_out4)
r_out5, (h_n5, c_n5) = self.rnn2(r_out4, (h_n4, c_n4))
r_out5 = self.bn(r_out5)
r_out6, (h_n6, c_n6) = self.rnn2(r_out5, (h_n5, c_n5))
r_out6 = self.bn(r_out6)
r_out7, (h_n7, c_n7) = self.rnn2(r_out6, (h_n6, c_n6))
out = self.out(r_out7[:, -1, :]) # 只取最后一行 (r_out[:,-1,:]).shape=[64,64]
return out


rnn = RNN()
rnn = rnn.to(device)
optimizer = torch.optim.Adam(rnn.parameters(), lr=LR)
loss_func = nn.CrossEntropyLoss()
loss_func = loss_func.to(device)
# print(test_y)
for epoch in range(EPOCH):
for step, (y, x) in enumerate(train_loader):

b_x = Variable((x.view(-1, 5, 704)))
b_y = Variable(y)
b_x = b_x.float()
b_y = b_y.long()

b_x = b_x.to(device)
b_y = b_y.to(device)

output = rnn(b_x)
# print(output.shape)
loss = loss_func(output, b_y)
optimizer.zero_grad()
loss.backward()
optimizer.step()

if step % 10 == 0:
test_output = rnn(test_x)
pred_y = torch.max(test_output, 1)[1].data.squeeze()
pred_y = pred_y.float()
# print((pred_y==test_y).sum())
# print(pred_y)
accuracy = float((pred_y == test_y).sum().item()) / float(test_y.size(0))
print("Epoch: ", epoch, "| train loss : %.4f" % loss, '| test accuracy: %.2f' % accuracy)