Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,10 @@ class CollectByRssReaderUseCase(
val failures = AtomicInteger(0)

val futures = blogs.map { blog ->
CompletableFuture.supplyAsync({ fetchSingleBlog(blog, failures) }, fetchExecutor)
.whenComplete { _, ex ->
log.info("Future done blogId={}", blog.id)
}
CompletableFuture.supplyAsync(
{ fetchSingleBlog(blog, failures) },
fetchExecutor
)
}

CompletableFuture.allOf(*futures.toTypedArray()).join()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@ class BatchSchedulesConfig {
@Bean("schedulesTaskScheduler")
fun schedulesTaskScheduler(): ThreadPoolTaskScheduler {
val scheduler = ThreadPoolTaskScheduler()
scheduler.poolSize = 4
scheduler.threadNamePrefix = "schedules-"
scheduler.poolSize = 2
scheduler.threadNamePrefix = "publish-msg-"
scheduler.setWaitForTasksToCompleteOnShutdown(true)
scheduler.setAwaitTerminationSeconds(30)
scheduler.setAwaitTerminationSeconds(15)
scheduler.setErrorHandler { ex ->
log.error("[BatchSchedulesConfig] Scheduled task execution failed", ex)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package site.techmoa.batch.schedules.dto

data class NewArticleDto(
val articleId: Long,
val title: String,
val link: String,
val pubDate: Long,
val blogId: Long,
val blogName: String,
) {
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import org.springframework.transaction.event.TransactionPhase
import org.springframework.transaction.event.TransactionalEventListener
import site.techmoa.batch.schedules.annotation.EventHandler
import site.techmoa.batch.schedules.repository.OutboxRepository
import site.techmoa.domain.event.NewArticlesEvents
import site.techmoa.domain.event.OutboxMessages

@EventHandler
class NewArticlesEventHandler(
Expand All @@ -15,7 +15,7 @@ class NewArticlesEventHandler(

@TransactionalEventListener(phase = TransactionPhase.BEFORE_COMMIT)
@Transactional(propagation = Propagation.MANDATORY)
fun recordMessage(event: NewArticlesEvents) {
outboxRepository.save(event)
fun recordMessage(messages: OutboxMessages) {
outboxRepository.save(messages)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,54 +3,34 @@ package site.techmoa.batch.schedules.repository
import org.springframework.jdbc.core.JdbcTemplate
import org.springframework.jdbc.core.RowMapper
import org.springframework.stereotype.Repository
import site.techmoa.domain.model.Article
import site.techmoa.batch.schedules.dto.NewArticleDto

@Repository(value = "SchedulesArticleRepository")
class ArticleRepository(
private val jdbcTemplate: JdbcTemplate
) {
private val rowMapper = RowMapper { rs, _ ->
Article(
id = rs.getLong("article_id"),
blogId = rs.getLong("blog_id"),
title = rs.getString("title"),
link = rs.getString("link"),
guid = rs.getString("guid"),
pubDate = rs.getLong("pub_date"),
views = rs.getInt("views")
)
}

fun findByIdGreaterThan(articleId: Long): List<Article> {
fun findByIdGreaterThan(articleId: Long): List<NewArticleDto> {
val sql = """
SELECT article_id, blog_id, title, link, guid, pub_date, views
FROM article
WHERE article_id > ?
SELECT a.article_id, a.blog_id, a.title, a.link, a.pub_date, b.name
FROM article a
LEFT JOIN blog b on b.blog_id = a.blog_id
WHERE a.article_id > ?;
""".trimIndent()

return jdbcTemplate.query(
sql,
rowMapper,
RowMapper { rs, _ ->
NewArticleDto(
articleId = rs.getLong("article_id"),
title = rs.getString("title"),
link = rs.getString("link"),
pubDate = rs.getLong("pub_date"),
blogId = rs.getLong("blog_id"),
blogName = rs.getString("name"),
)
},
articleId
)
}

fun findByBlogIdAndGuid(blogId: Long, guid: String): Article? {
val sql = """
SELECT article_id, blog_id, title, link, guid, pub_date, views
FROM article
WHERE blog_id = ?
AND guid = ?
LIMIT 1
""".trimIndent()

val result = jdbcTemplate.query(
sql,
rowMapper,
blogId,
guid
)

return result.firstOrNull()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ class LastScannedArticleRepository(
private val jdbcTemplate: JdbcTemplate
) {

fun findBy(publishNotificationJob: String): LastScannedArticleDto? {
fun findBy(jobName: String): LastScannedArticleDto? {
val sql = """
SELECT last_scanned_id
FROM scheduler_scan_support
Expand All @@ -23,7 +23,7 @@ class LastScannedArticleRepository(
val lastScannedId = jdbcTemplate.queryForObject(
sql,
Long::class.java,
publishNotificationJob
jobName
)
lastScannedId?.let { LastScannedArticleDto(it) }
} catch (_: EmptyResultDataAccessException) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,17 @@ package site.techmoa.batch.schedules.repository
import org.springframework.jdbc.core.JdbcTemplate
import org.springframework.jdbc.core.RowMapper
import org.springframework.stereotype.Repository
import site.techmoa.domain.event.NewArticlesEvents
import org.springframework.transaction.annotation.Transactional
import site.techmoa.domain.event.EventType
import site.techmoa.domain.event.OutboxMessages
import site.techmoa.domain.event.OutboxMessages.NewArticlesOutboxMessage
import site.techmoa.domain.event.OutboxStatus
import java.sql.Timestamp
import java.time.Instant

@Repository(value = "SchedulesOutboxRepository")
class OutboxRepository(
private val jdbcTemplate: JdbcTemplate
private val jdbcTemplate: JdbcTemplate,
) {

data class OutboxMessage(
Expand All @@ -20,99 +23,129 @@ class OutboxRepository(
val idempotencyKey: String,
)

fun save(event: NewArticlesEvents) {
val now = Timestamp.from(Instant.now())
val sql = """
INSERT INTO article_created_outbox (
blog_id,
guid,
idempotency_key,
status,
published_at,
last_error_message,
created_at,
updated_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""".trimIndent()

jdbcTemplate.batchUpdate(
sql,
event.events.map { e ->
arrayOf(
e.article.blogId,
e.article.guid,
e.idempotencyKey,
OutboxStatus.PENDING.name,
null,
null,
now,
now
)
}
)
}

fun findPending(): List<OutboxMessage> {
fun save(messages: OutboxMessages) {
val sql = """
SELECT article_created_outbox_id, blog_id, guid, idempotency_key
FROM article_created_outbox
WHERE status = ?
ORDER BY created_at ASC
""".trimIndent()
INSERT INTO outbox_messages (
event_type,
aggregate_type,
aggregate_id,
idempotency_key,
payload,
payload_type,
status
) VALUES (?, ?, ?, ?, ?, ?, ?)
""".trimIndent()

val rowMapper = RowMapper { rs, _ ->
OutboxMessage(
id = rs.getLong("article_created_outbox_id"),
blogId = rs.getLong("blog_id"),
guid = rs.getString("guid"),
idempotencyKey = rs.getString("idempotency_key")
val args: List<Array<Any?>> = messages.messages.map { e ->
arrayOf(
e.eventType.name,
e.aggregateType,
e.aggregateId,
e.idempotencyKey,
e.payload,
e.payloadType,
e.status.name,
)
}

return jdbcTemplate.query(
sql,
rowMapper,
OutboxStatus.PENDING.name,
)
jdbcTemplate.batchUpdate(sql, args)
}

fun markPublished(outboxId: Long) {
@Transactional
fun markSuccess(outboxId: Long) {
val now = Timestamp.from(Instant.now())
val sql = """
UPDATE article_created_outbox
UPDATE outbox_messages
SET status = ?,
published_at = ?,
last_error_message = NULL,
updated_at = ?
WHERE article_created_outbox_id = ?
last_error_message = NULL
WHERE outbox_message_id = ?
""".trimIndent()

jdbcTemplate.update(
sql,
OutboxStatus.PUBLISHED.name,
now,
OutboxStatus.SUCCESS.name,
now,
outboxId
)
}

@Transactional
fun markFailed(outboxId: Long, errorMessage: String) {
val now = Timestamp.from(Instant.now())
val sql = """
UPDATE article_created_outbox
UPDATE outbox_messages
SET status = ?,
published_at = NULL,
last_error_message = ?,
updated_at = ?
WHERE article_created_outbox_id = ?
last_error_message = ?
WHERE outbox_message_id = ?;
""".trimIndent()

jdbcTemplate.update(
sql,
OutboxStatus.FAILED.name,
errorMessage,
now,
outboxId
)
}

@Transactional
fun claimPending(batchSize: Int): List<NewArticlesOutboxMessage> {
val pendingMessages = findPending(batchSize)
markPublishing(pendingMessages.map { it.outboxMessageId })
return pendingMessages
}

private fun findPending(batchSize: Int): List<NewArticlesOutboxMessage> {
val sql = """
SELECT
outbox_message_id,
event_type,
aggregate_type,
aggregate_id,
idempotency_key,
payload,
payload_type,
status,
created_at
FROM outbox_messages
WHERE status = 'PENDING'
ORDER BY created_at ASC
limit $batchSize
FOR UPDATE SKIP LOCKED;
""".trimIndent()

val rowMapper = RowMapper { rs, _ ->
NewArticlesOutboxMessage(
outboxMessageId = rs.getLong("outbox_message_id"),
eventType = EventType.valueOf(rs.getString("event_type")),
aggregateType = rs.getString("aggregate_type"),
aggregateId = rs.getLong("aggregate_id"),
idempotencyKey = rs.getString("idempotency_key"),
payload = rs.getString("payload"),
payloadType = rs.getString("payload_type"),
status = OutboxStatus.valueOf(rs.getString("status"))
)
}

return jdbcTemplate.query(sql, rowMapper)
}

private fun markPublishing(outboxIds: List<Long>) {
val now = Timestamp.from(Instant.now())
val sql = """
UPDATE outbox_messages
SET status = 'PUBLISHING',
published_at = ?,
last_error_message = NULL
WHERE outbox_message_id = ?
AND status = 'PENDING'
""".trimIndent()

val args = outboxIds.map { id ->
arrayOf(now, id)
}

jdbcTemplate.batchUpdate(sql, args)
}
}
Loading