본문 바로가기
카테고리 없음

230414 TIL

by hbIncoding 2023. 4. 14.

1.  Spring batch 

  • 책 주문이 쌓이면 02시에 전날 02시 부터 쌓였던 주문들을 탐색하고 주문을 완료시킨다(취소 못하게 한다)
  • 해당 주문들을 통해 어떤 책이 얼마나 팔렸는지 분석하여 ranking을 만들고 redis 서버에 rank 키 이름으로 올린다.
  • 만약 불의의 사고로 redis서버의 데이터가 날아간 경우 서비스 단에서 redis에 랭킹을 다시 만들어 준다.
    • 이것은 아래에 더 자세히 설명할 것이다.

2.  작성한 코드 

 1)BatchScheduler

@Slf4j
@Component
public class BatchScheduler {

	@Autowired
	private JobLauncher jobLauncher;

	@Autowired
	private FakejumoonConfig fakejumoonConfig;

	@Autowired
	private CycleConfig cycleConfig;

	//2시 5분에 더미 주문들을 생성한다.
	@Scheduled(cron = "0 5 2 * * *")
	public void runJob(){

		Map<String, JobParameter> confMap = new HashMap<>();
		confMap.put("time", new JobParameter(System.currentTimeMillis()));
		JobParameters jobParameters = new JobParameters(confMap);

		try{
			jobLauncher.run(fakejumoonConfig.job(), jobParameters);
		}catch (JobExecutionAlreadyRunningException | JobInstanceAlreadyCompleteException |
				JobParametersInvalidException
				| org.springframework.batch.core.repository.JobRestartException e){
			log.error(e.getMessage());
		}
	}
	//2시에 주문들을 완료시키고 랭킹을 재정리한다.
	@Scheduled(cron = "0 0 2 * * *")
	public void resetAndRank(){

		Map<String, JobParameter> confMap = new HashMap<>();
		confMap.put("time", new JobParameter(System.currentTimeMillis()));
		JobParameters jobParameters = new JobParameters(confMap);

		try{
			jobLauncher.run(cycleConfig.jobs(), jobParameters);
		}catch (JobExecutionAlreadyRunningException | JobInstanceAlreadyCompleteException | JobParametersInvalidException
				| org.springframework.batch.core.repository.JobRestartException e){
			log.error(e.getMessage());
		}
	}
}

2)랭킹 리셋

@Slf4j
@Configuration
@RequiredArgsConstructor
public class CycleConfig {

	private final JobBuilderFactory jobBuilderFactory;
	private final StepBuilderFactory stepBuilderFactory;
	private final BookRepository bookRepository;
	private final JumoonRepository jumoonRepository;
	private final RedisTemplate redisTemplate;

	@Bean
	public Job jobs() {
		Job job = jobBuilderFactory.get("job")
			.start(StepA())
			.on("FAILED")
			.end()
			.from(StepA())
			.on("*")
			.to(StepB())
			.on("FAILED")
			.end()
			.from(StepB())
			.on("*")
			.to(StepC())
			.on("FAILED")
			.end()
			.from(StepC())
			.on("*")
			.to(StepD())
			.end()
			.build();
		return job;
	}

	List<Long> bookidkeys = new ArrayList<>();

	List<Jumoon> jumoons = new ArrayList<>();

	@Bean
	public Step StepA() {
		return stepBuilderFactory.get("StepA")
			.tasklet((contribution, chunkContext) -> {
				log.info("Step1. Jumoontable에서 fine가 false인 책들만 가져오기");
                
				jumoons = jumoonRepository.findAllByFine(false);
				return RepeatStatus.FINISHED;
			})
			.build();
	}

	@Bean
	public Step StepB() {
		return stepBuilderFactory.get("StepB")
			.tasklet((contribution, chunkContext) -> {
				log.info("Step2. 어떤 책이 얼마나 팔렸나 계산");

				//어떤 책이 얼마나 팔렸는지 map으로 저장
				Map<Long, Integer> soldbooks = new HashMap<>();
				for(Jumoon i : jumoons){
					Long bookid = i.getBook().getId();
					Integer quantity = i.getQuantity();
					if (soldbooks.containsKey(bookid)){
						soldbooks.put(bookid,(soldbooks.get(bookid)+quantity));
					}else {
						soldbooks.put(bookid,quantity);
					}
				}

				//map의 value 순서대로 정렬하기
				bookidkeys = new ArrayList<>(soldbooks.keySet());
				Collections.sort(bookidkeys, ((o1, o2) -> (soldbooks.get(o2).compareTo(soldbooks.get(o1)))));

				//어제 주문된 책 종류가 8개 미만인 경우 100,101,102... 순으로 책을 채워준다.
				Long cnt = 100L;
				while (bookidkeys.size()<8){
					bookidkeys.add(cnt);
					soldbooks.put(cnt,0);
					cnt++;
				}

				//이제 랭크 순서대로 넣어준다. "rank" : {"1,9", "2,8"....} 책id와 판매량은 쉼표로 구분하고 각각 리스트의 원소로 넣자
				List<String> bookrankAndsold = new ArrayList<>();
				for (int i = 0; i < 8; i++) {
					String rankvalue = bookidkeys.get(i)+","+soldbooks.get(bookidkeys.get(i));
					bookrankAndsold.add(rankvalue);
				}

				
				ValueOperations<String, List<String>> values = redisTemplate.opsForValue();
				//레디스 초기화 이후
				redisTemplate.getConnectionFactory().getConnection().flushAll();
				//만든 랭킹을 redis에 넣어주기
				values.set("rank",bookrankAndsold);

				return RepeatStatus.FINISHED;
			})
			.build();
	}

	@Bean
	public Step StepC() {
		return stepBuilderFactory.get("StepC")
			.tasklet((contribution, chunkContext) -> {
				log.info("Step3. 오늘 주문들 다 endFine로 true로 바꿔주기");

				for(Jumoon i : jumoons){
					i.endFine();
				}

				return RepeatStatus.FINISHED;
			})
			.build();
	}

	@Bean
	public Step StepD() {
		return stepBuilderFactory.get("StepD")
			.tasklet((contribution, chunkContext) -> {
				log.info("Step4. 재고량 채워 주기");

				for (Long i: bookidkeys){
					Book book = bookRepository.findById(i).orElseThrow(
						()->new CustomException(ErrorCode.BOOK_NOT_FOUND)
					);
					if(book.getOutOfPrint() != 1){ // 절판이 아닐 때만 재고를 올려줌
						if(book.getInventory() == 0){ // 기존 상태가 품절이었으면 modifiedAt까지 함께 변경
							book.orderbook(20L);
						}else {
							book.batchBook(20L); // 품절이 아니었으면 재고만 변경
						}
					}
				}
				return RepeatStatus.FINISHED;
			})
			.build();
	}

}

3)가짜 주문 생성

@Slf4j
@Configuration
@RequiredArgsConstructor
public class FakejumoonConfig {

	private final JobBuilderFactory jobBuilderFactory;
	private final StepBuilderFactory stepBuilderFactory;
	private final BookRepository bookRepository;
	private final MemberRepository memberRepository;
	private final JumoonService jumoonService;

	@Bean
	public Job job() {
		Job job = jobBuilderFactory.get("job")
			.start(stepNextConditionalStepA())
			.on("FAILED")
			.end()
			.from(stepNextConditionalStepA())
			.on("*")
			.to(stepNextConditionalStepB())
			.on("FAILED")
			.end()
			.from(stepNextConditionalStepB())
			.on("*")
			.to(stepNextConditionalStepC())
			.on("*")
			.end()
			.end()
			.build();
		return job;
	}

	@Bean
	public Step stepNextConditionalStepA() {
		return stepBuilderFactory.get("stepNextConditionalStepA")
			.tasklet((contribution, chunkContext) -> {
				log.info(">>>>> 가짜 주문 생성에 들어갑니다");
				return RepeatStatus.FINISHED;
			})
			.build();
	}

	@Bean
	public Step stepNextConditionalStepB() {
		return stepBuilderFactory.get("stepNextConditionalStepB")
			.tasklet((contribution, chunkContext) -> {
				int num = 1001;
                int bookorder = 9;
                
				//1001번부터 1007번까지 생성
				for (int i = num; i < num+8; i++) {
					Member member = memberRepository.findById(Long.valueOf(i)).orElseThrow(
						()-> new CustomException(ErrorCode.MEMBER_NOT_FOUND)
					);

					Book book = bookRepository.findById(Long.valueOf(i)).orElseThrow(
						()-> new CustomException(ErrorCode.BOOK_NOT_FOUND)
					);

					jumoonService.fakebookorder(book,member,(bookorder--));
				}
				return RepeatStatus.FINISHED;
			})
			.build();
	}

	@Bean
	public Step stepNextConditionalStepC() {
		return stepBuilderFactory.get("stepNextConditionalStepC")
			.tasklet((contribution, chunkContext) -> {
				log.info(">>>>> 가짜 주문 생성이 끝났습니다");
				return RepeatStatus.FINISHED;
			})
			.build();
	}

}

3.  위와 같이 batch를 작성한 이유

 1) 주문 데이터의 중요성과 DB 접근성

  • DB에 자주 접근하는 것은 좋지않다. 따라서 오늘 주문 데이터를 Redis에서 저장하고 있다가 02시에 일괄적으로 DB에 저장하는 방법을 구상했다.
  • 하지만 주문 데이터는 서비스에서 매우 중요하기 때문에 휘발성이 높은 Redis에 저장해 두는 것은 좋지 않다.
  • 따라서 redis에는 가장 많이 찾게 되는 main 페이지에서 어제 팔린 책들에 대한 ranking만 넣어 두는 것으로 설계했다.

2) 예외 처리

  • 현재는 가짜 주문을 추가하여 어제의 bestseller 1~8등까지 들어가나, 실제 서비스에서는 그럴 수 없다.
  • 따라서 어제 팔린 책 종류가 8권 미만인 경우 부족한 책들을 채워 주도록 구상하였다.

4.  batch step 예외 처리 방법

  • 배치는 작게 step이라는 단계와 이 step들을 실행시키는 job으로 나누어져 있다.
  • 어떤 step들을 어떤 순서로 진행시킬지와 step별로 에러 발생시 어떻게 할지 짤 수 있다.
	@Bean
	public Job job() {
		Job job = jobBuilderFactory.get("job")
			.start(stepNextConditionalStepA())
			.on("FAILED")
			.end()
			.from(stepNextConditionalStepA())
			.on("*")
			.to(stepNextConditionalStepB())
			.on("FAILED")
			.end()
			.from(stepNextConditionalStepB())
			.on("*")
			.to(stepNextConditionalStepC())
			.on("*")
			.end()
			.end()
			.build();
		return job;
	}

 

  • 위와 같을 때 크게 "FAILED" 와 "*" 이 중요하다.
  • StepB가 실패하면 end() 즉 job은 종료되며, failed이외에 모든 경우인 "*"가 발생한 경우에는 다음 stepC로 넘어간다.
  • 위와 같이 step별로 시나리오를 생각하여 상황별 예외처리를 해줄 수 도 있다.

5.  batch 와 Transaction

  • 앞서 언급하였지만 Redis서버에서 Ranking 데이터가 증발하게 된다면, service 로직에서 이것을 감지하고 어제 주문 데이터들을 조회해서 새롭게 만들어 주는 로직을 구상했다.

 1)메인 페이지에서 어제의 bestSeller 조회

	@Transactional
	public List<BookRankDto> bookList() {

		List<BookRankDto> bookList = new ArrayList<>();
		ValueOperations<String, List<String>> values = redisTemplate.opsForValue();

		if (values.get("rank") == null) {

			//일단 랭킹을 만들어 달라고 하자
			//오래 걸릴 수 도 있으닌까 일단 아무거내 내뱉고 다음에 얻어가자
			try {
				summonRank();
			}catch (Exception e){
			}

			Long random;
			Random r = new Random();

			for (int i = 0; i < 8; i++) {
				random = (long)r.nextInt(4000000);
				Optional<Book> book = bookRepository.findById(random);
				if (book.isPresent()) {
					bookList.add(new BookRankDto(book.get(), 0,String.valueOf(bookList.size()+1)));
				} else
					i--;
			}
		} else {
			List<String> topbooks = values.get("rank");
			for (String i : topbooks) {
				Long bookid = Long.valueOf(i.split(",")[0]);
				Integer booksold = Integer.valueOf(i.split(",")[1]);

				Optional<Book> book = bookRepository.findById(bookid);
				if(booksold==0){
					bookList.add(new BookRankDto(book.get(),booksold,"추천 도서"));

				}else
				bookList.add(new BookRankDto(book.get(),booksold,String.valueOf(bookList.size()+1)+"등"));
			}
		}

		return bookList;
	}

 

2)랭킹을 만들어주는 summonRank()

private void summonRank(){

		List<Long> bookidkeys = new ArrayList<>();
		List<Jumoon> jumoons = new ArrayList<>();

		LocalDateTime now = LocalDateTime.now();

		LocalDateTime starttime;
		LocalDateTime endtime;

		if (now.getHour()>=2){
			LocalDateTime ytd = now.minusDays(1);
			starttime = LocalDateTime.of(ytd.getYear(),ytd.getMonth(), ytd.getDayOfMonth(), ytd.getHour(), 0);
			endtime = LocalDateTime.of(now.getYear(),now.getMonth(),now.getDayOfMonth(), now.getHour(), 0);
		}else {
			LocalDateTime yytd = now.minusDays(2);
			LocalDateTime ytd = now.minusDays(1);
			starttime = LocalDateTime.of(yytd.getYear(),yytd.getMonth(), yytd.getDayOfMonth(), yytd.getHour(), 0);
			endtime = LocalDateTime.of(ytd.getYear(),ytd.getMonth(), ytd.getDayOfMonth(), ytd.getHour(), 0);
		}
		jumoons = jumoonRepository.findAllByJumoonatBetween(starttime,endtime);

		//어떤 책이 얼마나 팔렸는지 map으로 저장
		Map<Long, Integer> soldbooks = new HashMap<>();
		for(Jumoon i : jumoons){
			Long bookid = i.getBook().getId();
			Integer quantity = i.getQuantity();

			if (soldbooks.containsKey(bookid)){
				soldbooks.put(bookid,(soldbooks.get(bookid)+quantity));
			}else {
				soldbooks.put(bookid,quantity);
			}
		}

		//map의 value 순서대로 정렬하기
		bookidkeys = new ArrayList<>(soldbooks.keySet());
		Collections.sort(bookidkeys, ((o1, o2) -> (soldbooks.get(o2).compareTo(soldbooks.get(o1)))));

		//랜덤으로 책 채워주기
		Long random;
		Random r = new Random();
		while (bookidkeys.size()<8){
			random = (long)r.nextInt(4000000);
			bookidkeys.add(random);
			soldbooks.put(random,0);
		}

		//이제 랭크 순서대로 넣어준다. "rank" : {"1,9", "2,8"....} 책id와 판매량은 쉼표로 구분하고 각각 리스트의 원소로 넣자
		List<String> bookrankAndsold = new ArrayList<>();
		for (int i = 0; i < 8; i++) {
			String rankvalue = bookidkeys.get(i)+","+soldbooks.get(bookidkeys.get(i));
			bookrankAndsold.add(rankvalue);
		}

		//rankvalue를 redis에 저장시키기
		ValueOperations<String, List<String>> values = redisTemplate.opsForValue();
		//레디스 초기화 이후
		redisTemplate.getConnectionFactory().getConnection().flushAll();
		//랭크값 넣어주기
		values.set("rank",bookrankAndsold);


	}

3)위와 같이 구상한 이유

  • 우선 summonRank()를 try catch에 넣은 이유는 redis서버에 문제가 생기면 랭킹 만들기가 실패하기 때문에 try안에 넣었다.
  • 그리고 해당 과정이 어제 주문 수에 따라 오래 걸릴 수 있기 때문에 우선 랜덤으로 뱉어 주었다.
  • summonRank()라는 복잡한 메서드가 service에 들어가 있는데, batch 로 빼주고 싶었으나 불가능하였다.
    • @Transaction 안에 batch를 넣을 수 가 없다. Jobrepository의 기본 설정이 외부 트랜잭션을 허용하지 않기 때문이다.
    • batch 내부에서 @Transactional을 붙인 메서드를 실행하기 때문인데 만약 더 큰 Transaction안에 들어가 있다면 내부 transaction이 끝나도 커밋시점이 도래하지 않기 때문이다.
    • 해결 방법으로는 TransactionTemplate가 새로운 트랜잭션을 생성하도록 강제하여 외부 트랜잭션과 별개로 커밋하게 만들어 데드락 문제를 해결할 수 있다. 하지만 외부 트랜잭션과 내부 트랜잭션의 StepExecution 상태의 일관성이 깨질 수 있게 될 수 도 있다.
    • 따라서 Job 실행 외부에서 임의로 트랜잭션을 시작하지 못하도록 한 것이다.
  • 정리
    • 스프링 배치는 Job과 Step의 실행 상태(JobInstance, JobExcution, StepExcution)를 관리하는 JobRepository가 있다. JobRepository의 오퍼레이션에 스프링 배치 외부의 트랜잭션 개입을 막기 위해 Job 실행 시점에 외부에 기존 트랜잭션이 열려있는지를 검사한다. 만약 트랜잭션이 활성화 돼있다면 익셉션을 던져 Job 실행을 중단한다.
    • 아래 참조를 참고하면 좋을 것이다. 다만 매우 깊고 어려운 내용이다.

 

6.  참조

1)배치와 트랜잭션을 함께 사용하지 못하는 이유 : https://brunch.co.kr/@anonymdevoo/50

 

Spring batch와 @Transactional

[BATCH-1668] added check for transaction | 본 글의 설명과 코드는 spring-batch의 4.3.x 버전을 기준으로 한다. 출처: spring-batch/4.3.x spring-batch(이하 스프링 배치)의 Job을 트랜잭션 안에서 실행하게 되면 아래와

brunch.co.kr