Search
📿

스프링 스케쥴러를 활용하여 푸쉬 알람 전송하기

Tags
Task
Date
2024/03/18

1. 개요

만들어야 하는 기능은 다음과 같습니다.
의사는 자신의 환자들에게 정해진 시간에 메세지를 보내고 싶어합니다.
조건을 통해 자신의 환자 중 특정 환자들에게 메세지를 보낼 수 있으며, 시간, 메세지 역시 수정 가능합니다.
알람은 여러 개를 생성할 수 있으며, 각자 다른 시간, 다른 환자들, 다른 메세지를 보낼 수 있습니다.

2. v1.0.0

SchedulerConfig.java
@Configuration public class SchedulerConfig { @Bean public TaskScheduler taskScheduler() { ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler(); scheduler.setPoolSize(10); scheduler.setThreadNamePrefix("scheduled-task-"); scheduler.setAwaitTerminationSeconds(60); scheduler.setWaitForTasksToCompleteOnShutdown(true); scheduler.initialize(); return scheduler; } }
Java
복사
NotificationCreateRequest.java
import com.digitalpharm.admin.alcohol.data.entity.Schedule; import io.swagger.v3.oas.annotations.media.Schema; import lombok.AccessLevel; import lombok.AllArgsConstructor; import lombok.Getter; import lombok.NoArgsConstructor; import lombok.Setter; import lombok.ToString; import lombok.experimental.FieldDefaults; @NoArgsConstructor @AllArgsConstructor @Getter @Setter @ToString @FieldDefaults(level = AccessLevel.PRIVATE) @Schema(description = "질문 정보 요청") public class NotificationCreateRequest { private String content; private String target; private Schedule schedule; }
Java
복사
NotificationUpdateRequest.java
import com.digitalpharm.admin.alcohol.data.entity.Schedule; import io.swagger.v3.oas.annotations.media.Schema; import lombok.AccessLevel; import lombok.AllArgsConstructor; import lombok.Getter; import lombok.NoArgsConstructor; import lombok.Setter; import lombok.ToString; import lombok.experimental.FieldDefaults; @NoArgsConstructor @AllArgsConstructor @Getter @Setter @ToString @FieldDefaults(level = AccessLevel.PRIVATE) @Schema(description = "질문 수정 요청") public class NotificationUpdateRequest { private Long adminId; private Long notificationId; private String content; private String target; private Schedule schedule; }
Java
복사
Notification.java
@NoArgsConstructor @AllArgsConstructor(access = AccessLevel.PRIVATE) @Getter @Setter @ToString @FieldDefaults(level = AccessLevel.PRIVATE) @Schema(description = "문자 알림") public class Notification { @Schema(description = "알림 ID") private Long id; @Schema(description = "스태프 ID") private Long staffId; @Schema(description = "알림 내용") private String content; @Schema(description = "알림 대상") private Target target; @Schema(description = "알림 일시") private Schedule schedule; @Schema(description = "환자 리스트") private List<String> patients = new ArrayList<>(); @Schema(description = "알림 활성화 여부") private boolean isEnabled = false; private Notification(Long staffId, String content, Target target, Schedule schedule, List<String> patients) { this.staffId = staffId; this.content = content; this.target = target; this.schedule = schedule; this.patients = patients; this.isEnabled = true; } public static Notification of(Long staffId, String content, Target target, Schedule schedule, List<String> patients) { return new Notification(staffId, content, target, schedule, patients); } public boolean isAutorized(Long adminId) { return staffId == adminId; } }
Java
복사
Schedule.java
@AllArgsConstructor public class Schedule { private String week = "*"; private String month = "*"; private String day = "*"; private String hour = "*"; private String minute = "*"; private String second = "*"; public String convertToCron() { return String.format("%s %s %s %s %s %s", second, minute, hour, day, month, week); } }
Java
복사
Target.java
import java.util.List; import java.util.stream.Collectors; public enum Target { ALL, SATISFY, UNSATISFY; public static Target of(String target) { return Target.valueOf(target.toUpperCase()); } public List<String> updateTargetPatients(List<String> patients) { if (this == ALL) { return patients; } // todo : 로직 변경 예정 return patients.stream() .filter(patient -> !patients.isEmpty()) .collect(Collectors.toList()); } }
Java
복사
NotificationModule.java
@Component @RequiredArgsConstructor public class NotificationModule { private final TaskScheduler taskScheduler; private final DpAuthService dpAuthService; private final DpSmsApiService dpSmsApiService; private final ConcurrentMap<Long, ScheduledFuture<?>> scheduledFutureMap = new ConcurrentHashMap<>(); public void push(Notification notification) { Runnable task = () -> notification.getPatients().forEach(uid -> { UserInfo userInfo = dpAuthService.getUserInfo(uid); try { dpSmsApiService.sendSms(userInfo.getPhoneNumber(), notification.getContent()); } catch (Exception e) { throw new RuntimeException(e); } }); try { CronTrigger trigger = new CronTrigger(notification.getCron()); ScheduledFuture<?> schedule = taskScheduler.schedule(task, trigger); scheduledFutureMap.put(notification.getId(), schedule); } catch (IllegalArgumentException e) { throw new IllegalArgumentException("잘못된 cron 표현식입니다."); } } public void stop(Notification notification) { ScheduledFuture<?> scheduledFuture = scheduledFutureMap.get(notification.getId()); if (scheduledFuture != null) { scheduledFuture.cancel(true); notification.disable(); } } }
Java
복사

3. v1.1.0

스프링 스케쥴러와 스프링 배치를 사용해서 로직을 재구성하였습니다.
NotificationModule.java
@Slf4j @RequiredArgsConstructor @Component public class NotificationModule { private final NotificationMapper notificationMapper; private final JobLauncher jobLauncher; @Qualifier("notificationTriggerJob") private final Job notificationTriggerJob; @Qualifier("notificationDispatcherJob") private final Job notificationDispatcherJob; private final TimeModule timeModule; // 1분마다 실행해야 하는 Notification이 없는지 확인한다 @Scheduled(cron = "${schedule.cron.reward.publish}") public void notificationTrigger() { log.info("Start notificationTrigger()"); LocalDateTime now = timeModule.now(); log.info("Test dayOfWeek, hour, minute : {}, {}, {}", now.getDayOfWeek().getValue(), now.getHour(), now.getMinute()); List<Notification> notifications = notificationMapper.findAllScheduledNotifications( now.getDayOfWeek().getValue(), now.getHour(), now.getMinute()); log.info("notificationId : {}", notifications.get(0).getId()); // notifcations이 비어있지 않다면 현재 실행해야 하는 Notification이 있다고 가정합니다. if (!notifications.isEmpty()) { // notifications을 순회하며 각각의 Notification에 스프링 배치 작업을 진행합니다. notifications.stream() .filter(Notification::isEnabled) .forEach(notification -> { try { log.info("Start for-each : {}", notification.getId()); JobParameters jobParameters = new JobParametersBuilder() .addLong("notificationId", notification.getId()) .toJobParameters(); jobLauncher.run(notificationTriggerJob, jobParameters); } catch (Exception e) { log.error("Notification Trigger Error", e); } }); } } // 1분마다 실행하여 전송해야 하는 Notification이 없는지 확인한다 @Scheduled(cron = "0 */1 * * * *") public void notificationDispatcher() { List<Notification> notifications = notificationMapper.findAllUncheckedNotifications(); if (!notifications.isEmpty()) { notifications.stream() .forEach(notification -> { try { JobParameters jobParameters = new JobParametersBuilder() .addLong("notificationId", notification.getId()) .addLong("staffId", notification.getStaffId()) .addLong("time", System.currentTimeMillis()) .toJobParameters(); jobLauncher.run(notificationDispatcherJob, jobParameters); } catch (Exception e) { log.error("Notification Dispatcher Error", e); } }); } } }
Java
복사
NotificationDispatcherConfig.java
@Slf4j @RequiredArgsConstructor @Configuration public class NotificationDispatcherConfig { private final DataSource dataSource; @Bean public Job notificationDispatcherJob(final JobRepository jobRepository, final Step notificationDispatcherStep) { log.info("Start notificationDispatcherJob()"); return new JobBuilder("notificationDispatcherJob") .repository(jobRepository) .start(notificationDispatcherStep) .build(); } @Bean @JobScope public Step notificationDispatcherStep(JobRepository jobRepository, PlatformTransactionManager transactionManager, @Value("#{jobParameters['notificationId']}") Long notificationId, @Value("#{jobParameters['staffId']}") Long staffId) { log.info("Start notificationDispatcherStep()"); return new StepBuilder("notificationDispatcherStep") .<String, String>chunk(10) .reader(notificationDispatcherReader(staffId)) .processor(notificationDispatcherProcessor(notificationId)) .writer(notificationDispatcherWriter(notificationId)) .repository(jobRepository) .transactionManager(transactionManager) .build(); } @Bean @StepScope public JdbcCursorItemReader<String> notificationDispatcherReader( @Value("#{jobParameters['staffId']}") Long staffId) { log.info("Start notificationDispatcherReader()"); return new JdbcCursorItemReaderBuilder<String>() .name("notificationDispatcherReader") .dataSource(dataSource) .sql( "SELECT UID FROM TB_PSCT_CODE WHERE stff_id = ?") .preparedStatementSetter(ps -> ps.setLong(1, staffId)) .rowMapper((rs, rowNum) -> rs.getString("UID")) .build(); } @Bean @StepScope public ItemProcessor<String, String> notificationDispatcherProcessor( @Value("#{jobParameters['notificationId']}") Long notificationId) { log.info("Start notificationDispatcherProcessor()"); JdbcTemplate jdbcTemplate = new JdbcTemplate(dataSource); return pid -> { pid = pid.trim(); // 굳이 기록하지 않더라도 타겟에 맞는 사람만 전송하면 될듯? if (!pid.isEmpty()) { jdbcTemplate.update( "INSERT INTO TB_PATIENT_NOTIFICATION_1 (notification_id, patient_uid) VALUES (?, ?)", notificationId, pid); // Send 로직 추가 // 만약 여기에서 실패하는 친구가 생긴다면 어떻게 처리할 것인가? } return pid; }; } // ItemReader에 의해 조회된 Notification 객체들이 Writer로 전달된다. @Bean @StepScope public ItemWriter<String> notificationDispatcherWriter( @Value("#{jobParameters['notificationId']}") Long notificationId) { JdbcTemplate jdbcTemplate = new JdbcTemplate(dataSource); return pids -> { pids.stream().filter(pid -> !pid.isEmpty()).forEach(pid -> { jdbcTemplate.update( "INSERT INTO tb_notification_log_1 (notification_id, patient_uid, is_succeed, send_date) VALUES (?, ?, ?, ?)", ps -> { ps.setLong(1, notificationId); ps.setString(2, pid); ps.setBoolean(3, true); // status ps.setTimestamp(4, Timestamp.valueOf(LocalDateTime.now())); // created_at }); }); }; } }
Java
복사
NotificationTriggerConfig.java
@Slf4j @RequiredArgsConstructor @Configuration public class NotificationTriggerConfig { private final DataSource dataSource; @Bean public Job notificationTriggerJob(final JobRepository jobRepository, final Step notificationTriggerStep) { log.info("Start notificationTriggerJob()"); return new JobBuilder("notificationTriggerJob") .repository(jobRepository) .start(notificationTriggerStep) .build(); } @Bean @JobScope public Step notificationTriggerStep(JobRepository jobRepository, PlatformTransactionManager transactionManager, @Value("#{jobParameters['notificationId']}") Long notificationId) { log.info("Start notificationTriggerStep()"); return new StepBuilder("notificationTriggerStep") // <T1, T2> 에서 T1은 Reader에서 반환하는 타입, T2는 프로세서에서 처리한 후 Writer 에게 전달하는 타입 // 추가적으로 chunk size는 page size와 동일하게 가져가는 것이 좋다 .<Notification, Notification>chunk(10) .reader(notificationTriggerReader(notificationId)) .processor(notificationTriggerProcessor()) .writer(notificationTriggerWriter()) .repository(jobRepository) .transactionManager(transactionManager) .build(); } @Bean @StepScope public JdbcCursorItemReader<Notification> notificationTriggerReader( @Value("#{jobParameters['notificationId']}") Long notificationId) { log.info("Start notificationTriggerReader()"); return new JdbcCursorItemReaderBuilder<Notification>() .name("notificationTriggerReader") .dataSource(dataSource) .sql( "SELECT id, staff_id, message, target FROM tb_notification_3 WHERE id = ?") .preparedStatementSetter(ps -> ps.setLong(1, notificationId)) .rowMapper(new RowMapper<Notification>() { @Override public Notification mapRow(ResultSet rs, int rowNum) throws SQLException { Notification notification = new Notification(); notification.setId(rs.getLong("id")); notification.setStaffId(rs.getLong("staff_id")); notification.setMessage(rs.getString("message")); notification.setTarget(rs.getString("target")); return notification; } }) .build(); } @Bean @StepScope public ItemProcessor<Notification, Notification> notificationTriggerProcessor() { log.info("Start ItemProcessor()"); JdbcTemplate jdbcTemplate = new JdbcTemplate(dataSource); return notification -> { log.info("Start update modDate"); jdbcTemplate.update( "UPDATE TB_NOTIFICATION_3 SET MOD_DATE = NOW() WHERE ID = ?", notification.getId()); return notification; }; } // ItemReader에 의해 조회된 Notification 객체들이 Writer로 전달된다. @Bean public ItemWriter<Notification> notificationTriggerWriter() { log.info("Start ItemWriter()"); JdbcTemplate jdbcTemplate = new JdbcTemplate(dataSource); return notifications -> { for (Notification notification : notifications) { jdbcTemplate.update( "INSERT INTO tb_notification_4 (notification_id, staff_id, message, target, is_checked) VALUES (?, ?, ?, ?, ?)", notification.getId(), notification.getStaffId(), notification.getMessage(), notification.getTarget(), false); } }; } }
Java
복사