[LSTF-Linear] Are Transformers Effective for Time Series Forecasting
Introduction
최근 Transformer 기반 모델은 Long-Term Time Series Forecasting에서 기존의 Non-Transformer 기반 방법론보다 정확도가 우수한 것으로 나타났습니다. 그러나 이러한 실험에서 기존의 베이스라인 모델은 Autoregressive 방식으로 예측되었으며, 이는 Error Accumulation Effect를 유발할 수 있으므로 성능이 좋지 않을 수밖에 없습니다.
Transformer 구조의 핵심은 Multi-Head Self Attention입니다. 이 구조는 Long Sequence에서 Paired Element 간의 잠재적인 상관관계를 추출하는 데 효과적이지만, 시간 순서를 고려하지 않고 있습니다. 시계열 데이터 분석의 경우 입력 데이터의 순서가 연속적인 지점 사이에 존재하는 Temporal Dynamics를 찾는 데 중요한 역할을 하기 때문에 Multi-Head Self Attention의 동작 원리를 고려할 때 Transformer가 시계열 데이터 분석에 효과적인지 질문할 필요가 있습니다.

본 논문은 Transformer 기반 모델에서 Lookback Window의 크기를 늘릴수록 예측 오류가 감소해야 하지만 그렇지 않은 현상, 심지어 Lookback Window의 크기를 늘렸음에도 예측 오류가 증가하는 현상을 지적합니다. 그리고 저자는 Transformer와 같이 복잡한 모델을 사용하지 않고 단순히 2개의 1-Layer Linear Network만을 사용하는 모델을 제안하고, 여러 LTSF 벤치마크 데이터 세트에서 제안된 모델이 기존 모델을 능가하는 것을 실험을 통해 보이고자 합니다.
위와 같은 내용으로 간단한 선형 모델이 Long Term 시계열 데이터에서 시간 순서 정보를 보존하면서 추세와 주기성에 대한 특징을 더 잘 추출할 수 있기 때문에 LTSF-Linear(Linear, DLinear, NLinear) 모델을 제안했습니다. 해당 모델은 단순한 단일 선형 레이어로 구성되었지만 9개의 벤치마크 데이터 세트에서 기존의 Transformer 기반 모델보다 우수한 성능을 보여줬습니다.
Models
1. DLinear
DLinear 모델은 Autoformer와 FEDformer 모델에서 사용되는 시계열 분해 방식을 선형 레이어와 결합한 모델입니다. 먼저 이동 평균을 계산한 다음 이를 제거하여 추세와 주기성 데이터로 분해합니다. 그런 다음 각 구성 요소에 단일 선형 레이어를 적용하고 두 개를 합산하여 최종 예측을 계산합니다. 이 모델은 시계열 데이터에 명확한 추세와 주기성이 있을 때 기존 선형 모델보다 더 나은 성능을 발휘할 수 있습니다.

2. NLinear
NLinear 모델은 선형 모델의 변형으로, 가장 마지막 값을 뺀 후 모델을 학습하고 가장 마지막에 다시 더하여 예측을 계산합니다. 이렇게 하면 분포 이동이 방지되어 학습 데이터와 평가 데이터 간의 분포가 일치하지 않기 때문에 발생할 수 있는 예측 오류를 방지할 수 있습니다. 이러한 개선으로 NLinear 모델은 ETth1, ETth2 및 ILI 벤치마크 데이터셋에 대해 우수한 성능을 달성했습니다.

이외에도 하나의 레이어를 갖고 있는 Linear 모델이 Transformer 시계열 모델보다 성능이 좋다는 것을 실험적으로 확인했습니다.
LSTF-Linear 알고리즘 Pytorch
import torch
import pyupbit
import torch.nn as nn
from torch.nn import functional as F
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.dates as mdates
from tqdm import tqdm
from torch.utils.data import Dataset, DataLoader
from sklearn.preprocessing import MinMaxScaler
class moving_avg(nn.Module):
"""
Moving average block to highlight the trend of time series
"""
def __init__(self, kernel_size, stride):
super(moving_avg, self).__init__()
self.kernel_size = kernel_size
self.avg = nn.AvgPool1d(kernel_size=kernel_size, stride=stride, padding=0)
def forward(self, x):
# padding on the both ends of time series
front = x[:, 0:1, :].repeat(1, (self.kernel_size - 1) // 2, 1)
end = x[:, -1:, :].repeat(1, (self.kernel_size - 1) // 2, 1)
x = torch.cat([front, x, end], dim=1)
x = self.avg(x.permute(0, 2, 1))
x = x.permute(0, 2, 1)
return x
class series_decomp(nn.Module):
"""
Series decomposition block
"""
def __init__(self, kernel_size):
super(series_decomp, self).__init__()
self.moving_avg = moving_avg(kernel_size, stride=1)
def forward(self, x):
moving_mean = self.moving_avg(x)
res = x - moving_mean
return res, moving_mean
class Dlinear(nn.Module):
"""
D-Linear
"""
def __init__(self, window_size, forcast_size, feature_size=4, kernel_size=25, individual=False):
super(Dlinear, self).__init__()
# Decompsition Kernel Size
self.seq_len = window_size
self.pred_len = forcast_size
self.channels = feature_size
self.decompsition = series_decomp(kernel_size)
self.individual = individual
if self.individual:
self.Linear_Seasonal = nn.ModuleList()
self.Linear_Trend = nn.ModuleList()
for i in range(self.channels):
self.Linear_Seasonal.append(nn.Linear(self.seq_len,self.pred_len))
self.Linear_Trend.append(nn.Linear(self.seq_len,self.pred_len))
else:
self.Linear_Seasonal = nn.Linear(self.seq_len,self.pred_len)
self.Linear_Trend = nn.Linear(self.seq_len,self.pred_len)
def forward(self, x):
# x: [Batch, Input length, Channel]
seasonal_init, trend_init = self.decompsition(x)
seasonal_init, trend_init = seasonal_init.permute(0,2,1), trend_init.permute(0,2,1)
if self.individual:
seasonal_output = torch.zeros([seasonal_init.size(0),seasonal_init.size(1),self.pred_len],dtype=seasonal_init.dtype).to(seasonal_init.device)
trend_output = torch.zeros([trend_init.size(0),trend_init.size(1),self.pred_len],dtype=trend_init.dtype).to(trend_init.device)
for i in range(self.channels):
seasonal_output[:,i,:] = self.Linear_Seasonal[i](seasonal_init[:,i,:])
trend_output[:,i,:] = self.Linear_Trend[i](trend_init[:,i,:])
else:
seasonal_output = self.Linear_Seasonal(seasonal_init)
trend_output = self.Linear_Trend(trend_init)
x = seasonal_output + trend_output
return x.permute(0,2,1) # to [Batch, Output length, Channel]
class LTSF_Linear(torch.nn.Module):
def __init__(self, window_size, forcast_size, individual, feature_size = 4, individual=False):
super(LTSF_Linear, self).__init__()
self.window_size = window_size
self.forcast_size = forcast_size
self.individual = individual
self.channels = feature_size
if self.individual:
self.Linear = torch.nn.ModuleList()
for i in range(self.channels):
self.Linear.append(torch.nn.Linear(self.window_size, self.forcast_size))
else:
self.Linear = torch.nn.Linear(self.window_size, self.forcast_size)
def forward(self, x):
if self.individual:
output = torch.zeros([x.size(0),self.pred_len,x.size(2)],dtype=x.dtype).to(x.device)
for i in range(self.channels):
output[:,:,i] = self.Linear[i](x[:,:,i])
x = output
else:
x = self.Linear(x.permute(0,2,1)).permute(0,2,1)
return x
class Nlinear(torch.nn.Module):
def __init__(self, window_size, forcast_size, feature_size=4, individual=False):
super(Nlinear, self).__init__()
self.window_size = window_size
self.forcast_size = forcast_size
self.channels = feature_size
self.individual = individual
if self.individual:
self.Linear = torch.nn.ModuleList()
for i in range(self.channels):
self.Linear.append(torch.nn.Linear(self.window_size, self.forcast_size))
else:
self.Linear = torch.nn.Linear(self.window_size, self.forcast_size)
def forward(self, x):
seq_last = x[:,-1:,:].detach()
x = x - seq_last
if self.individual:
output = torch.zeros([x.size(0), self.forcast_size, x.size(2)],dtype=x.dtype).to(x.device)
for i in range(self.channels):
output[:,:,i] = self.Linear[i](x[:,:,i])
x = output
else:
x = self.Linear(x.permute(0,2,1)).permute(0,2,1)
x = x + seq_last
return x
def targetParsing(data,target,index=False):
if index==False:
result=data.loc[:,target]
else:
result=data.iloc[:,target]
return list(result.index), result.to_numpy()
def transform(raw,check_inverse=False):
data=raw.reshape(-1,1)
if check_inverse==False:
return scaler.fit_transform(data)
else:
return scaler.inverse_transform(data)[:,0]
class windowDataset(Dataset):
def __init__(self, y, input_window, output_window, stride=1):
#총 데이터의 개수
L = y.shape[0]
#stride씩 움직일 때 생기는 총 sample의 개수
num_samples = (L - input_window - output_window) // stride + 1
#input과 output
X = np.zeros([input_window, num_samples])
Y = np.zeros([output_window, num_samples])
for i in np.arange(num_samples):
start_x = stride*i
end_x = start_x + input_window
X[:,i] = y[start_x:end_x]
start_y = stride*i + input_window
end_y = start_y + output_window
Y[:,i] = y[start_y:end_y]
X = X.reshape(X.shape[0], X.shape[1], 1).transpose((1,0,2)) #X:(num_samples,input_window,1)
Y = Y.reshape(Y.shape[0], Y.shape[1], 1).transpose((1,0,2)) #Y:(num_samples,output_window,1)
self.x = X
self.y = Y
self.len = len(X)
def __getitem__(self, i):
return self.x[i], self.y[i]
def __len__(self):
return self.len
def customDataLoader(data,window_size:int,forecast_size:int,batch_size:int):
train=transform(data)[:-window_size,0]
dataset=windowDataset(train,window_size,forecast_size)
result=DataLoader(dataset,batch_size=batch_size)
return result
class trainer():
def __init__(self, data, dataloader, window_size, forecast_size, name="DLinear", feature_size=4, lr=1e-4):
self.device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
self.data=data
self.trains=transform(data)[:-window_size,0]
self.dataloader=dataloader
self.window_size=window_size
self.forecast_size=forecast_size
if name=="DLinear":
self.model=Dlinear(window_size,forecast_size).to(self.device)
elif name == "NLinear":
self.model=Nlinear(window_size,forecast_size).to(self.device)
else:
self.model=LSTF_Linear(window_size,forecast_size).to(self.device)
self.feature_size=feature_size
self.name=name
self.criterion = nn.MSELoss()
self.optimizer=torch.optim.Adam(self.model.parameters(), lr=lr)
def train(self, epoch=100):
self.model.train()
progress=tqdm(range(epoch))
losses=[]
for i in progress:
batchloss = 0.0
for (inputs, outputs) in self.dataloader:
self.optimizer.zero_grad()
result = self.model(inputs.float().to(self.device))
loss = self.criterion(result, outputs.float().to(self.device))
loss.backward()
self.optimizer.step()
batchloss += loss
losses.append(batchloss.cpu().item())
progress.set_description("loss: {:0.6f}".format(batchloss.cpu().item() / len(self.dataloader)))
plt.plot(losses)
def evaluate(self):
window_size=self.window_size
input = torch.tensor(self.trains[-window_size:]).reshape(1,-1,1).float().to(self.device)
self.model.eval()
predictions = self.model(input)
return predictions.detach().cpu().numpy()
def implement(self):
process=trainer(self.data,self.dataloader,self.window_size,
self.forecast_size,self.feature_size,self.name)
process.train()
evaluate=process.evaluate()
result=transform(evaluate,check_inverse=True)
return result
def figureplot(date,data,pred,window_size,forecast_size):
datenum=mdates.date2num(date)
len=data.shape[0]
fig, ax = plt.subplots(figsize=(20,5))
ax.plot(datenum[len-window_size:len], data[len-window_size:], label="Real")
ax.plot(datenum[len-forecast_size:len], pred, label="LSTF-linear")
locator = mdates.AutoDateLocator()
formatter = mdates.AutoDateFormatter(locator)
ax.xaxis.set_major_locator(locator)
ax.xaxis.set_major_formatter(formatter)
plt.legend()
plt.show()
scaler=MinMaxScaler()
raw = pyupbit.get_ohlcv(ticker='KRW-BTC',interval='minute15', count = 500)
window_size=128
forecast_size= 10
date, data =targetParsing(raw,'close') # preprocess raw data
dataloader=customDataLoader(data, window_size,forecast_size, batch_size=4) #make dataloader
pred=trainer(data, dataloader, window_size, forecast_size, name="NLinear").implement() #train and evaluate
figureplot(date,data,pred,window_size,forecast_size) #plot the result

Reference:
- https://github.com/cure-lab/LTSF-Linear
- https://arxiv.org/pdf/2205.13504v2.pdf