1. 개요
만들어야 하는 기능은 다음과 같습니다.
•
의사는 자신의 환자들에게 정해진 시간에 메세지를 보내고 싶어합니다.
•
조건을 통해 자신의 환자 중 특정 환자들에게 메세지를 보낼 수 있으며, 시간, 메세지 역시 수정 가능합니다.
•
알람은 여러 개를 생성할 수 있으며, 각자 다른 시간, 다른 환자들, 다른 메세지를 보낼 수 있습니다.
2. v1.0.0
SchedulerConfig.java
@Configuration
public class SchedulerConfig {
@Bean
public TaskScheduler taskScheduler() {
ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
scheduler.setPoolSize(10);
scheduler.setThreadNamePrefix("scheduled-task-");
scheduler.setAwaitTerminationSeconds(60);
scheduler.setWaitForTasksToCompleteOnShutdown(true);
scheduler.initialize();
return scheduler;
}
}
Java
복사
NotificationCreateRequest.java
import com.digitalpharm.admin.alcohol.data.entity.Schedule;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.AccessLevel;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import lombok.ToString;
import lombok.experimental.FieldDefaults;
@NoArgsConstructor
@AllArgsConstructor
@Getter
@Setter
@ToString
@FieldDefaults(level = AccessLevel.PRIVATE)
@Schema(description = "질문 정보 요청")
public class NotificationCreateRequest {
private String content;
private String target;
private Schedule schedule;
}
Java
복사
NotificationUpdateRequest.java
import com.digitalpharm.admin.alcohol.data.entity.Schedule;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.AccessLevel;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import lombok.ToString;
import lombok.experimental.FieldDefaults;
@NoArgsConstructor
@AllArgsConstructor
@Getter
@Setter
@ToString
@FieldDefaults(level = AccessLevel.PRIVATE)
@Schema(description = "질문 수정 요청")
public class NotificationUpdateRequest {
private Long adminId;
private Long notificationId;
private String content;
private String target;
private Schedule schedule;
}
Java
복사
Notification.java
@NoArgsConstructor
@AllArgsConstructor(access = AccessLevel.PRIVATE)
@Getter
@Setter
@ToString
@FieldDefaults(level = AccessLevel.PRIVATE)
@Schema(description = "문자 알림")
public class Notification {
@Schema(description = "알림 ID")
private Long id;
@Schema(description = "스태프 ID")
private Long staffId;
@Schema(description = "알림 내용")
private String content;
@Schema(description = "알림 대상")
private Target target;
@Schema(description = "알림 일시")
private Schedule schedule;
@Schema(description = "환자 리스트")
private List<String> patients = new ArrayList<>();
@Schema(description = "알림 활성화 여부")
private boolean isEnabled = false;
private Notification(Long staffId, String content, Target target, Schedule schedule,
List<String> patients) {
this.staffId = staffId;
this.content = content;
this.target = target;
this.schedule = schedule;
this.patients = patients;
this.isEnabled = true;
}
public static Notification of(Long staffId, String content, Target target, Schedule schedule,
List<String> patients) {
return new Notification(staffId, content, target, schedule, patients);
}
public boolean isAutorized(Long adminId) {
return staffId == adminId;
}
}
Java
복사
Schedule.java
@AllArgsConstructor
public class Schedule {
private String week = "*";
private String month = "*";
private String day = "*";
private String hour = "*";
private String minute = "*";
private String second = "*";
public String convertToCron() {
return String.format("%s %s %s %s %s %s", second, minute, hour, day, month, week);
}
}
Java
복사
Target.java
import java.util.List;
import java.util.stream.Collectors;
public enum Target {
ALL,
SATISFY,
UNSATISFY;
public static Target of(String target) {
return Target.valueOf(target.toUpperCase());
}
public List<String> updateTargetPatients(List<String> patients) {
if (this == ALL) {
return patients;
}
// todo : 로직 변경 예정
return patients.stream()
.filter(patient -> !patients.isEmpty())
.collect(Collectors.toList());
}
}
Java
복사
NotificationModule.java
@Component
@RequiredArgsConstructor
public class NotificationModule {
private final TaskScheduler taskScheduler;
private final DpAuthService dpAuthService;
private final DpSmsApiService dpSmsApiService;
private final ConcurrentMap<Long, ScheduledFuture<?>> scheduledFutureMap = new ConcurrentHashMap<>();
public void push(Notification notification) {
Runnable task = () -> notification.getPatients().forEach(uid -> {
UserInfo userInfo = dpAuthService.getUserInfo(uid);
try {
dpSmsApiService.sendSms(userInfo.getPhoneNumber(),
notification.getContent());
} catch (Exception e) {
throw new RuntimeException(e);
}
});
try {
CronTrigger trigger = new CronTrigger(notification.getCron());
ScheduledFuture<?> schedule = taskScheduler.schedule(task, trigger);
scheduledFutureMap.put(notification.getId(), schedule);
} catch (IllegalArgumentException e) {
throw new IllegalArgumentException("잘못된 cron 표현식입니다.");
}
}
public void stop(Notification notification) {
ScheduledFuture<?> scheduledFuture = scheduledFutureMap.get(notification.getId());
if (scheduledFuture != null) {
scheduledFuture.cancel(true);
notification.disable();
}
}
}
Java
복사
3. v1.1.0
스프링 스케쥴러와 스프링 배치를 사용해서 로직을 재구성하였습니다.
NotificationModule.java
@Slf4j
@RequiredArgsConstructor
@Component
public class NotificationModule {
private final NotificationMapper notificationMapper;
private final JobLauncher jobLauncher;
@Qualifier("notificationTriggerJob")
private final Job notificationTriggerJob;
@Qualifier("notificationDispatcherJob")
private final Job notificationDispatcherJob;
private final TimeModule timeModule;
// 1분마다 실행해야 하는 Notification이 없는지 확인한다
@Scheduled(cron = "${schedule.cron.reward.publish}")
public void notificationTrigger() {
log.info("Start notificationTrigger()");
LocalDateTime now = timeModule.now();
log.info("Test dayOfWeek, hour, minute : {}, {}, {}", now.getDayOfWeek().getValue(), now.getHour(), now.getMinute());
List<Notification> notifications = notificationMapper.findAllScheduledNotifications(
now.getDayOfWeek().getValue(), now.getHour(), now.getMinute());
log.info("notificationId : {}", notifications.get(0).getId());
// notifcations이 비어있지 않다면 현재 실행해야 하는 Notification이 있다고 가정합니다.
if (!notifications.isEmpty()) {
// notifications을 순회하며 각각의 Notification에 스프링 배치 작업을 진행합니다.
notifications.stream()
.filter(Notification::isEnabled)
.forEach(notification -> {
try {
log.info("Start for-each : {}", notification.getId());
JobParameters jobParameters = new JobParametersBuilder()
.addLong("notificationId", notification.getId())
.toJobParameters();
jobLauncher.run(notificationTriggerJob, jobParameters);
} catch (Exception e) {
log.error("Notification Trigger Error", e);
}
});
}
}
// 1분마다 실행하여 전송해야 하는 Notification이 없는지 확인한다
@Scheduled(cron = "0 */1 * * * *")
public void notificationDispatcher() {
List<Notification> notifications = notificationMapper.findAllUncheckedNotifications();
if (!notifications.isEmpty()) {
notifications.stream()
.forEach(notification -> {
try {
JobParameters jobParameters = new JobParametersBuilder()
.addLong("notificationId", notification.getId())
.addLong("staffId", notification.getStaffId())
.addLong("time", System.currentTimeMillis())
.toJobParameters();
jobLauncher.run(notificationDispatcherJob, jobParameters);
} catch (Exception e) {
log.error("Notification Dispatcher Error", e);
}
});
}
}
}
Java
복사
NotificationDispatcherConfig.java
@Slf4j
@RequiredArgsConstructor
@Configuration
public class NotificationDispatcherConfig {
private final DataSource dataSource;
@Bean
public Job notificationDispatcherJob(final JobRepository jobRepository,
final Step notificationDispatcherStep) {
log.info("Start notificationDispatcherJob()");
return new JobBuilder("notificationDispatcherJob")
.repository(jobRepository)
.start(notificationDispatcherStep)
.build();
}
@Bean
@JobScope
public Step notificationDispatcherStep(JobRepository jobRepository,
PlatformTransactionManager transactionManager,
@Value("#{jobParameters['notificationId']}") Long notificationId,
@Value("#{jobParameters['staffId']}") Long staffId) {
log.info("Start notificationDispatcherStep()");
return new StepBuilder("notificationDispatcherStep")
.<String, String>chunk(10)
.reader(notificationDispatcherReader(staffId))
.processor(notificationDispatcherProcessor(notificationId))
.writer(notificationDispatcherWriter(notificationId))
.repository(jobRepository)
.transactionManager(transactionManager)
.build();
}
@Bean
@StepScope
public JdbcCursorItemReader<String> notificationDispatcherReader(
@Value("#{jobParameters['staffId']}") Long staffId) {
log.info("Start notificationDispatcherReader()");
return new JdbcCursorItemReaderBuilder<String>()
.name("notificationDispatcherReader")
.dataSource(dataSource)
.sql(
"SELECT UID FROM TB_PSCT_CODE WHERE stff_id = ?")
.preparedStatementSetter(ps -> ps.setLong(1, staffId))
.rowMapper((rs, rowNum) -> rs.getString("UID"))
.build();
}
@Bean
@StepScope
public ItemProcessor<String, String> notificationDispatcherProcessor(
@Value("#{jobParameters['notificationId']}") Long notificationId) {
log.info("Start notificationDispatcherProcessor()");
JdbcTemplate jdbcTemplate = new JdbcTemplate(dataSource);
return pid -> {
pid = pid.trim();
// 굳이 기록하지 않더라도 타겟에 맞는 사람만 전송하면 될듯?
if (!pid.isEmpty()) {
jdbcTemplate.update(
"INSERT INTO TB_PATIENT_NOTIFICATION_1 (notification_id, patient_uid) VALUES (?, ?)",
notificationId, pid);
// Send 로직 추가
// 만약 여기에서 실패하는 친구가 생긴다면 어떻게 처리할 것인가?
}
return pid;
};
}
// ItemReader에 의해 조회된 Notification 객체들이 Writer로 전달된다.
@Bean
@StepScope
public ItemWriter<String> notificationDispatcherWriter(
@Value("#{jobParameters['notificationId']}") Long notificationId) {
JdbcTemplate jdbcTemplate = new JdbcTemplate(dataSource);
return pids -> {
pids.stream().filter(pid -> !pid.isEmpty()).forEach(pid -> {
jdbcTemplate.update(
"INSERT INTO tb_notification_log_1 (notification_id, patient_uid, is_succeed, send_date) VALUES (?, ?, ?, ?)",
ps -> {
ps.setLong(1, notificationId);
ps.setString(2, pid);
ps.setBoolean(3, true); // status
ps.setTimestamp(4, Timestamp.valueOf(LocalDateTime.now())); // created_at
});
});
};
}
}
Java
복사
NotificationTriggerConfig.java
@Slf4j
@RequiredArgsConstructor
@Configuration
public class NotificationTriggerConfig {
private final DataSource dataSource;
@Bean
public Job notificationTriggerJob(final JobRepository jobRepository,
final Step notificationTriggerStep) {
log.info("Start notificationTriggerJob()");
return new JobBuilder("notificationTriggerJob")
.repository(jobRepository)
.start(notificationTriggerStep)
.build();
}
@Bean
@JobScope
public Step notificationTriggerStep(JobRepository jobRepository,
PlatformTransactionManager transactionManager,
@Value("#{jobParameters['notificationId']}") Long notificationId) {
log.info("Start notificationTriggerStep()");
return new StepBuilder("notificationTriggerStep")
// <T1, T2> 에서 T1은 Reader에서 반환하는 타입, T2는 프로세서에서 처리한 후 Writer 에게 전달하는 타입
// 추가적으로 chunk size는 page size와 동일하게 가져가는 것이 좋다
.<Notification, Notification>chunk(10)
.reader(notificationTriggerReader(notificationId))
.processor(notificationTriggerProcessor())
.writer(notificationTriggerWriter())
.repository(jobRepository)
.transactionManager(transactionManager)
.build();
}
@Bean
@StepScope
public JdbcCursorItemReader<Notification> notificationTriggerReader(
@Value("#{jobParameters['notificationId']}") Long notificationId) {
log.info("Start notificationTriggerReader()");
return new JdbcCursorItemReaderBuilder<Notification>()
.name("notificationTriggerReader")
.dataSource(dataSource)
.sql(
"SELECT id, staff_id, message, target FROM tb_notification_3 WHERE id = ?")
.preparedStatementSetter(ps -> ps.setLong(1, notificationId))
.rowMapper(new RowMapper<Notification>() {
@Override
public Notification mapRow(ResultSet rs, int rowNum) throws SQLException {
Notification notification = new Notification();
notification.setId(rs.getLong("id"));
notification.setStaffId(rs.getLong("staff_id"));
notification.setMessage(rs.getString("message"));
notification.setTarget(rs.getString("target"));
return notification;
}
})
.build();
}
@Bean
@StepScope
public ItemProcessor<Notification, Notification> notificationTriggerProcessor() {
log.info("Start ItemProcessor()");
JdbcTemplate jdbcTemplate = new JdbcTemplate(dataSource);
return notification -> {
log.info("Start update modDate");
jdbcTemplate.update(
"UPDATE TB_NOTIFICATION_3 SET MOD_DATE = NOW() WHERE ID = ?",
notification.getId());
return notification;
};
}
// ItemReader에 의해 조회된 Notification 객체들이 Writer로 전달된다.
@Bean
public ItemWriter<Notification> notificationTriggerWriter() {
log.info("Start ItemWriter()");
JdbcTemplate jdbcTemplate = new JdbcTemplate(dataSource);
return notifications -> {
for (Notification notification : notifications) {
jdbcTemplate.update(
"INSERT INTO tb_notification_4 (notification_id, staff_id, message, target, is_checked) VALUES (?, ?, ?, ?, ?)",
notification.getId(), notification.getStaffId(), notification.getMessage(),
notification.getTarget(), false);
}
};
}
}
Java
복사