{"url":"https://api.github.com/repos/apache/spark/pulls/28904","id":438397872,"node_id":"MDExOlB1bGxSZXF1ZXN0NDM4Mzk3ODcy","html_url":"https://github.com/apache/spark/pull/28904","diff_url":"https://github.com/apache/spark/pull/28904.diff","patch_url":"https://github.com/apache/spark/pull/28904.patch","issue_url":"https://api.github.com/repos/apache/spark/issues/28904","number":28904,"state":"closed","locked":false,"title":"[SPARK-30462][SS] Streamline the logic on file stream source and sink metadata log to avoid memory issue","user":{"login":"HeartSaVioR","id":1317309,"node_id":"MDQ6VXNlcjEzMTczMDk=","avatar_url":"https://avatars.githubusercontent.com/u/1317309?v=4","gravatar_id":"","url":"https://api.github.com/users/HeartSaVioR","html_url":"https://github.com/HeartSaVioR","followers_url":"https://api.github.com/users/HeartSaVioR/followers","following_url":"https://api.github.com/users/HeartSaVioR/following{/other_user}","gists_url":"https://api.github.com/users/HeartSaVioR/gists{/gist_id}","starred_url":"https://api.github.com/users/HeartSaVioR/starred{/owner}{/repo}","subscriptions_url":"https://api.github.com/users/HeartSaVioR/subscriptions","organizations_url":"https://api.github.com/users/HeartSaVioR/orgs","repos_url":"https://api.github.com/users/HeartSaVioR/repos","events_url":"https://api.github.com/users/HeartSaVioR/events{/privacy}","received_events_url":"https://api.github.com/users/HeartSaVioR/received_events","type":"User","site_admin":false},"body":"### What changes were proposed in this pull request?\r\n\r\nIn many operations on CompactibleFileStreamLog reads a metadata log file and materializes all entries into memory. As the nature of the compact operation, CompactibleFileStreamLog may have a huge compact log file with bunch of entries included, and for now they're just monotonically increasing, which means the amount of memory to materialize also grows incrementally. This leads pressure on GC.\r\n\r\nThis patch proposes to streamline the logic on file stream source and sink whenever possible to avoid memory issue. To make this possible we have to break the existing behavior of excluding entries - now the `compactLogs` method is called with all entries, which forces us to materialize all entries into memory. This is hopefully no effect on end users, because only file stream sink has a condition to exclude entries, and the condition has been never true. (DELETE_ACTION has been never set.)\r\n\r\nBased on the observation, this patch also changes the existing UT a bit which simulates the situation where \"A\" file is added, and another batch marks the \"A\" file as deleted. This situation simply doesn't work with the change, but as I mentioned earlier it hasn't been used. (I'm not sure the UT is from the actual run. I guess not.)\r\n\r\n### Why are the changes needed?\r\n\r\nThe memory issue (OOME) is reported by both JIRA issue and user mailing list.\r\n\r\n### Does this PR introduce _any_ user-facing change?\r\n\r\nNo.\r\n\r\n### How was this patch tested?\r\n\r\n* Existing UTs\r\n* Manual test done\r\n\r\nThe manual test leverages the simple apps which continuously writes the file stream sink metadata log.\r\n\r\nhttps://github.com/HeartSaVioR/spark-delegation-token-experiment/commit/bea7680e4c588f455f8c3181a96c9eff5002fa1a\r\n\r\nThe test is configured to have a batch metadata log file at 1.9M (10,000 entries) whereas other Spark configuration is set to the default. (compact interval = 10) The app runs as driver, and the heap memory on driver is set to 3g.\r\n\r\n> before the patch\r\n\r\n\r\n\r\nIt only ran for 40 mins, with the latest compact batch file size as 1.3G. The process struggled with GC, and after some struggling, it threw OOME.\r\n\r\n> after the patch\r\n\r\n\r\n\r\nIt sustained 2 hours run (manually stopped as it's expected to run more), with the latest compact batch file size as 2.2G. The actual memory usage didn't even go up to 1.2G, and be cleaned up soon without outstanding GC activity.","created_at":"2020-06-23T08:11:52Z","updated_at":"2020-08-21T06:08:49Z","closed_at":"2020-08-20T09:31:51Z","merged_at":null,"merge_commit_sha":"2c80ffbfd5361588347df23bc47ed258933331e2","assignee":null,"assignees":[],"requested_reviewers":[],"requested_teams":[],"labels":[{"id":1405794576,"node_id":"MDU6TGFiZWwxNDA1Nzk0NTc2","url":"https://api.github.com/repos/apache/spark/labels/SQL","name":"SQL","color":"ededed","default":false,"description":null},{"id":1406587328,"node_id":"MDU6TGFiZWwxNDA2NTg3MzI4","url":"https://api.github.com/repos/apache/spark/labels/STRUCTURED%20STREAMING","name":"STRUCTURED STREAMING","color":"ededed","default":false,"description":null}],"milestone":null,"draft":false,"commits_url":"https://api.github.com/repos/apache/spark/pulls/28904/commits","review_comments_url":"https://api.github.com/repos/apache/spark/pulls/28904/comments","review_comment_url":"https://api.github.com/repos/apache/spark/pulls/comments{/number}","comments_url":"https://api.github.com/repos/apache/spark/issues/28904/comments","statuses_url":"https://api.github.com/repos/apache/spark/statuses/e16ebe4e530d3c44bb0ba39981c4ec2287c3589e","head":{"label":"HeartSaVioR:SPARK-30462","ref":"SPARK-30462","sha":"e16ebe4e530d3c44bb0ba39981c4ec2287c3589e","user":{"login":"HeartSaVioR","id":1317309,"node_id":"MDQ6VXNlcjEzMTczMDk=","avatar_url":"https://avatars.githubusercontent.com/u/1317309?v=4","gravatar_id":"","url":"https://api.github.com/users/HeartSaVioR","html_url":"https://github.com/HeartSaVioR","followers_url":"https://api.github.com/users/HeartSaVioR/followers","following_url":"https://api.github.com/users/HeartSaVioR/following{/other_user}","gists_url":"https://api.github.com/users/HeartSaVioR/gists{/gist_id}","starred_url":"https://api.github.com/users/HeartSaVioR/starred{/owner}{/repo}","subscriptions_url":"https://api.github.com/users/HeartSaVioR/subscriptions","organizations_url":"https://api.github.com/users/HeartSaVioR/orgs","repos_url":"https://api.github.com/users/HeartSaVioR/repos","events_url":"https://api.github.com/users/HeartSaVioR/events{/privacy}","received_events_url":"https://api.github.com/users/HeartSaVioR/received_events","type":"User","site_admin":false},"repo":{"id":27994296,"node_id":"MDEwOlJlcG9zaXRvcnkyNzk5NDI5Ng==","name":"spark","full_name":"HeartSaVioR/spark","private":false,"owner":{"login":"HeartSaVioR","id":1317309,"node_id":"MDQ6VXNlcjEzMTczMDk=","avatar_url":"https://avatars.githubusercontent.com/u/1317309?v=4","gravatar_id":"","url":"https://api.github.com/users/HeartSaVioR","html_url":"https://github.com/HeartSaVioR","followers_url":"https://api.github.com/users/HeartSaVioR/followers","following_url":"https://api.github.com/users/HeartSaVioR/following{/other_user}","gists_url":"https://api.github.com/users/HeartSaVioR/gists{/gist_id}","starred_url":"https://api.github.com/users/HeartSaVioR/starred{/owner}{/repo}","subscriptions_url":"https://api.github.com/users/HeartSaVioR/subscriptions","organizations_url":"https://api.github.com/users/HeartSaVioR/orgs","repos_url":"https://api.github.com/users/HeartSaVioR/repos","events_url":"https://api.github.com/users/HeartSaVioR/events{/privacy}","received_events_url":"https://api.github.com/users/HeartSaVioR/received_events","type":"User","site_admin":false},"html_url":"https://github.com/HeartSaVioR/spark","description":"Mirror of Apache Spark","fork":true,"url":"https://api.github.com/repos/HeartSaVioR/spark","forks_url":"https://api.github.com/repos/HeartSaVioR/spark/forks","keys_url":"https://api.github.com/repos/HeartSaVioR/spark/keys{/key_id}","collaborators_url":"https://api.github.com/repos/HeartSaVioR/spark/collaborators{/collaborator}","teams_url":"https://api.github.com/repos/HeartSaVioR/spark/teams","hooks_url":"https://api.github.com/repos/HeartSaVioR/spark/hooks","issue_events_url":"https://api.github.com/repos/HeartSaVioR/spark/issues/events{/number}","events_url":"https://api.github.com/repos/HeartSaVioR/spark/events","assignees_url":"https://api.github.com/repos/HeartSaVioR/spark/assignees{/user}","branches_url":"https://api.github.com/repos/HeartSaVioR/spark/branches{/branch}","tags_url":"https://api.github.com/repos/HeartSaVioR/spark/tags","blobs_url":"https://api.github.com/repos/HeartSaVioR/spark/git/blobs{/sha}","git_tags_url":"https://api.github.com/repos/HeartSaVioR/spark/git/tags{/sha}","git_refs_url":"https://api.github.com/repos/HeartSaVioR/spark/git/refs{/sha}","trees_url":"https://api.github.com/repos/HeartSaVioR/spark/git/trees{/sha}","statuses_url":"https://api.github.com/repos/HeartSaVioR/spark/statuses/{sha}","languages_url":"https://api.github.com/repos/HeartSaVioR/spark/languages","stargazers_url":"https://api.github.com/repos/HeartSaVioR/spark/stargazers","contributors_url":"https://api.github.com/repos/HeartSaVioR/spark/contributors","subscribers_url":"https://api.github.com/repos/HeartSaVioR/spark/subscribers","subscription_url":"https://api.github.com/repos/HeartSaVioR/spark/subscription","commits_url":"https://api.github.com/repos/HeartSaVioR/spark/commits{/sha}","git_commits_url":"https://api.github.com/repos/HeartSaVioR/spark/git/commits{/sha}","comments_url":"https://api.github.com/repos/HeartSaVioR/spark/comments{/number}","issue_comment_url":"https://api.github.com/repos/HeartSaVioR/spark/issues/comments{/number}","contents_url":"https://api.github.com/repos/HeartSaVioR/spark/contents/{+path}","compare_url":"https://api.github.com/repos/HeartSaVioR/spark/compare/{base}...{head}","merges_url":"https://api.github.com/repos/HeartSaVioR/spark/merges","archive_url":"https://api.github.com/repos/HeartSaVioR/spark/{archive_format}{/ref}","downloads_url":"https://api.github.com/repos/HeartSaVioR/spark/downloads","issues_url":"https://api.github.com/repos/HeartSaVioR/spark/issues{/number}","pulls_url":"https://api.github.com/repos/HeartSaVioR/spark/pulls{/number}","milestones_url":"https://api.github.com/repos/HeartSaVioR/spark/milestones{/number}","notifications_url":"https://api.github.com/repos/HeartSaVioR/spark/notifications{?since,all,participating}","labels_url":"https://api.github.com/repos/HeartSaVioR/spark/labels{/name}","releases_url":"https://api.github.com/repos/HeartSaVioR/spark/releases{/id}","deployments_url":"https://api.github.com/repos/HeartSaVioR/spark/deployments","created_at":"2014-12-14T12:56:04Z","updated_at":"2023-01-31T16:36:06Z","pushed_at":"2024-03-10T22:59:45Z","git_url":"git://github.com/HeartSaVioR/spark.git","ssh_url":"git@github.com:HeartSaVioR/spark.git","clone_url":"https://github.com/HeartSaVioR/spark.git","svn_url":"https://github.com/HeartSaVioR/spark","homepage":null,"size":400159,"stargazers_count":1,"watchers_count":1,"language":"Scala","has_issues":false,"has_projects":true,"has_downloads":true,"has_wiki":false,"has_pages":false,"has_discussions":false,"forks_count":2,"mirror_url":null,"archived":false,"disabled":false,"open_issues_count":1,"license":{"key":"apache-2.0","name":"Apache License 2.0","spdx_id":"Apache-2.0","url":"https://api.github.com/licenses/apache-2.0","node_id":"MDc6TGljZW5zZTI="},"allow_forking":true,"is_template":false,"web_commit_signoff_required":false,"topics":[],"visibility":"public","forks":2,"open_issues":1,"watchers":1,"default_branch":"master"}},"base":{"label":"apache:master","ref":"master","sha":"c28a6fa5112c9ba3839f52b737266f24fdfcf75b","user":{"login":"apache","id":47359,"node_id":"MDEyOk9yZ2FuaXphdGlvbjQ3MzU5","avatar_url":"https://avatars.githubusercontent.com/u/47359?v=4","gravatar_id":"","url":"https://api.github.com/users/apache","html_url":"https://github.com/apache","followers_url":"https://api.github.com/users/apache/followers","following_url":"https://api.github.com/users/apache/following{/other_user}","gists_url":"https://api.github.com/users/apache/gists{/gist_id}","starred_url":"https://api.github.com/users/apache/starred{/owner}{/repo}","subscriptions_url":"https://api.github.com/users/apache/subscriptions","organizations_url":"https://api.github.com/users/apache/orgs","repos_url":"https://api.github.com/users/apache/repos","events_url":"https://api.github.com/users/apache/events{/privacy}","received_events_url":"https://api.github.com/users/apache/received_events","type":"Organization","site_admin":false},"repo":{"id":17165658,"node_id":"MDEwOlJlcG9zaXRvcnkxNzE2NTY1OA==","name":"spark","full_name":"apache/spark","private":false,"owner":{"login":"apache","id":47359,"node_id":"MDEyOk9yZ2FuaXphdGlvbjQ3MzU5","avatar_url":"https://avatars.githubusercontent.com/u/47359?v=4","gravatar_id":"","url":"https://api.github.com/users/apache","html_url":"https://github.com/apache","followers_url":"https://api.github.com/users/apache/followers","following_url":"https://api.github.com/users/apache/following{/other_user}","gists_url":"https://api.github.com/users/apache/gists{/gist_id}","starred_url":"https://api.github.com/users/apache/starred{/owner}{/repo}","subscriptions_url":"https://api.github.com/users/apache/subscriptions","organizations_url":"https://api.github.com/users/apache/orgs","repos_url":"https://api.github.com/users/apache/repos","events_url":"https://api.github.com/users/apache/events{/privacy}","received_events_url":"https://api.github.com/users/apache/received_events","type":"Organization","site_admin":false},"html_url":"https://github.com/apache/spark","description":"Apache Spark - A unified analytics engine for large-scale data processing","fork":false,"url":"https://api.github.com/repos/apache/spark","forks_url":"https://api.github.com/repos/apache/spark/forks","keys_url":"https://api.github.com/repos/apache/spark/keys{/key_id}","collaborators_url":"https://api.github.com/repos/apache/spark/collaborators{/collaborator}","teams_url":"https://api.github.com/repos/apache/spark/teams","hooks_url":"https://api.github.com/repos/apache/spark/hooks","issue_events_url":"https://api.github.com/repos/apache/spark/issues/events{/number}","events_url":"https://api.github.com/repos/apache/spark/events","assignees_url":"https://api.github.com/repos/apache/spark/assignees{/user}","branches_url":"https://api.github.com/repos/apache/spark/branches{/branch}","tags_url":"https://api.github.com/repos/apache/spark/tags","blobs_url":"https://api.github.com/repos/apache/spark/git/blobs{/sha}","git_tags_url":"https://api.github.com/repos/apache/spark/git/tags{/sha}","git_refs_url":"https://api.github.com/repos/apache/spark/git/refs{/sha}","trees_url":"https://api.github.com/repos/apache/spark/git/trees{/sha}","statuses_url":"https://api.github.com/repos/apache/spark/statuses/{sha}","languages_url":"https://api.github.com/repos/apache/spark/languages","stargazers_url":"https://api.github.com/repos/apache/spark/stargazers","contributors_url":"https://api.github.com/repos/apache/spark/contributors","subscribers_url":"https://api.github.com/repos/apache/spark/subscribers","subscription_url":"https://api.github.com/repos/apache/spark/subscription","commits_url":"https://api.github.com/repos/apache/spark/commits{/sha}","git_commits_url":"https://api.github.com/repos/apache/spark/git/commits{/sha}","comments_url":"https://api.github.com/repos/apache/spark/comments{/number}","issue_comment_url":"https://api.github.com/repos/apache/spark/issues/comments{/number}","contents_url":"https://api.github.com/repos/apache/spark/contents/{+path}","compare_url":"https://api.github.com/repos/apache/spark/compare/{base}...{head}","merges_url":"https://api.github.com/repos/apache/spark/merges","archive_url":"https://api.github.com/repos/apache/spark/{archive_format}{/ref}","downloads_url":"https://api.github.com/repos/apache/spark/downloads","issues_url":"https://api.github.com/repos/apache/spark/issues{/number}","pulls_url":"https://api.github.com/repos/apache/spark/pulls{/number}","milestones_url":"https://api.github.com/repos/apache/spark/milestones{/number}","notifications_url":"https://api.github.com/repos/apache/spark/notifications{?since,all,participating}","labels_url":"https://api.github.com/repos/apache/spark/labels{/name}","releases_url":"https://api.github.com/repos/apache/spark/releases{/id}","deployments_url":"https://api.github.com/repos/apache/spark/deployments","created_at":"2014-02-25T08:00:08Z","updated_at":"2024-03-28T14:09:24Z","pushed_at":"2024-03-29T00:26:39Z","git_url":"git://github.com/apache/spark.git","ssh_url":"git@github.com:apache/spark.git","clone_url":"https://github.com/apache/spark.git","svn_url":"https://github.com/apache/spark","homepage":"https://spark.apache.org/","size":427558,"stargazers_count":38137,"watchers_count":38137,"language":"Scala","has_issues":false,"has_projects":true,"has_downloads":true,"has_wiki":false,"has_pages":false,"has_discussions":false,"forks_count":27873,"mirror_url":null,"archived":false,"disabled":false,"open_issues_count":188,"license":{"key":"apache-2.0","name":"Apache License 2.0","spdx_id":"Apache-2.0","url":"https://api.github.com/licenses/apache-2.0","node_id":"MDc6TGljZW5zZTI="},"allow_forking":true,"is_template":false,"web_commit_signoff_required":false,"topics":["big-data","java","jdbc","python","r","scala","spark","sql"],"visibility":"public","forks":27873,"open_issues":188,"watchers":38137,"default_branch":"master"}},"_links":{"self":{"href":"https://api.github.com/repos/apache/spark/pulls/28904"},"html":{"href":"https://github.com/apache/spark/pull/28904"},"issue":{"href":"https://api.github.com/repos/apache/spark/issues/28904"},"comments":{"href":"https://api.github.com/repos/apache/spark/issues/28904/comments"},"review_comments":{"href":"https://api.github.com/repos/apache/spark/pulls/28904/comments"},"review_comment":{"href":"https://api.github.com/repos/apache/spark/pulls/comments{/number}"},"commits":{"href":"https://api.github.com/repos/apache/spark/pulls/28904/commits"},"statuses":{"href":"https://api.github.com/repos/apache/spark/statuses/e16ebe4e530d3c44bb0ba39981c4ec2287c3589e"}},"author_association":"CONTRIBUTOR","auto_merge":null,"active_lock_reason":null,"merged":false,"mergeable":null,"rebaseable":null,"mergeable_state":"unknown","merged_by":null,"comments":69,"review_comments":39,"maintainer_can_modify":false,"commits":2,"additions":129,"deletions":87,"changed_files":7}